You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/10/26 06:03:12 UTC

[GitHub] peng-yongsheng closed pull request #1819: Fixed the bugs of the mesh branch.

peng-yongsheng closed pull request #1819: Fixed the bugs of the mesh branch.
URL: https://github.com/apache/incubator-skywalking/pull/1819
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 2a99df0d7..69999b858 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -22,9 +22,9 @@
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.generated.all.AllHeatmapIndicator;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
-import org.apache.skywalking.oap.server.core.worker.WorkerInstances;
 import org.slf4j.*;
 
 /**
@@ -38,9 +38,11 @@
     private final DataCarrier<Indicator> dataCarrier;
     private final MergeDataCache<Indicator> mergeDataCache;
     private int messageNum;
+    private final String modelName;
 
     IndicatorAggregateWorker(int workerId, AbstractWorker<Indicator> nextWorker, String modelName) {
         super(workerId);
+        this.modelName = modelName;
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
@@ -53,6 +55,10 @@
     }
 
     private void onWork(Indicator indicator) {
+        if (modelName.equals("all_heatmap")) {
+            AllHeatmapIndicator allHeatmapIndicator = (AllHeatmapIndicator)indicator;
+            logger.info("aggregate indicator: {}", allHeatmapIndicator.getDetailGroup().toStorageData());
+        }
         messageNum++;
         aggregate(indicator);
 
@@ -89,6 +95,14 @@ private void aggregate(Indicator indicator) {
         } else {
             mergeDataCache.put(indicator);
         }
+
+        mergeDataCache.getLast().collection().forEach(indicator1 -> {
+            if (modelName.equals("all_heatmap")) {
+                AllHeatmapIndicator allHeatmapIndicator = (AllHeatmapIndicator)indicator1;
+                logger.warn("aggregate indicator aggregate method: {}", allHeatmapIndicator.getDetailGroup().toStorageData());
+            }
+        });
+
         mergeDataCache.finishWriting();
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 1f7b57224..e3cde52ef 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -22,6 +22,7 @@
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.generated.all.AllHeatmapIndicator;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -54,6 +55,14 @@
         this.dataCarrier.consume(new IndicatorPersistentWorker.PersistentConsumer(this), 1);
     }
 
+    @Override void onWork(Indicator indicator) {
+        if (modelName.equals("all_heatmap")) {
+            AllHeatmapIndicator allHeatmapIndicator = (AllHeatmapIndicator)indicator;
+            logger.warn("persistent indicator: {}", allHeatmapIndicator.getDetailGroup().toStorageData());
+        }
+        super.onWork(indicator);
+    }
+
     @Override public void in(Indicator indicator) {
         indicator.setEndOfBatchContext(new EndOfBatchContext(false));
         dataCarrier.produce(indicator);
@@ -86,10 +95,23 @@ public boolean flushAndSwitch() {
             }
             try {
                 if (nonNull(dbData)) {
+                    if (modelName.equals("all_heatmap")) {
+                        AllHeatmapIndicator dbBefore = (AllHeatmapIndicator)dbData;
+                        AllHeatmapIndicator stremBefore = (AllHeatmapIndicator)data;
+                        logger.warn("need to combine, db data before: {}, stream data before: {}", dbBefore.getDetailGroup().toStorageData(), stremBefore.getDetailGroup().toStorageData());
+                    }
                     data.combine(dbData);
                     data.calculate();
+
+                    if (modelName.equals("all_heatmap")) {
+                        AllHeatmapIndicator stremAfter = (AllHeatmapIndicator)data;
+                        logger.warn("need to combine, stream data after: {}", stremAfter.getDetailGroup().toStorageData());
+                    }
                     batchCollection.add(indicatorDAO.prepareBatchUpdate(modelName, data));
                 } else {
+                    if (modelName.equals("all_heatmap")) {
+                        logger.warn("insert all_heatmap id: {}", data.id());
+                    }
                     batchCollection.add(indicatorDAO.prepareBatchInsert(modelName, data));
                 }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
index 90dee0f75..e7d67f9bf 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorProcess.java
@@ -60,7 +60,7 @@ public void create(ModuleManager moduleManager, Class<? extends Indicator> indic
         IndicatorTransWorker transWorker = new IndicatorTransWorker(WorkerIdGenerator.INSTANCES.generate(), minutePersistentWorker, hourPersistentWorker, dayPersistentWorker, monthPersistentWorker);
         WorkerInstances.INSTANCES.put(transWorker.getWorkerId(), transWorker);
 
-        IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, transWorker);
+        IndicatorRemoteWorker remoteWorker = new IndicatorRemoteWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager, transWorker, modelName);
         WorkerInstances.INSTANCES.put(remoteWorker.getWorkerId(), remoteWorker);
 
         IndicatorAggregateWorker aggregateWorker = new IndicatorAggregateWorker(WorkerIdGenerator.INSTANCES.generate(), remoteWorker, modelName);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
index 0295864f9..4c7b1379b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorRemoteWorker.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.generated.all.AllHeatmapIndicator;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
 import org.apache.skywalking.oap.server.core.remote.selector.Selector;
@@ -35,14 +36,21 @@
 
     private final AbstractWorker<Indicator> nextWorker;
     private final RemoteSenderService remoteSender;
+    private final String modelName;
 
-    IndicatorRemoteWorker(int workerId, ModuleManager moduleManager, AbstractWorker<Indicator> nextWorker) {
+    IndicatorRemoteWorker(int workerId, ModuleManager moduleManager, AbstractWorker<Indicator> nextWorker, String modelName) {
         super(workerId);
         this.remoteSender = moduleManager.find(CoreModule.NAME).getService(RemoteSenderService.class);
         this.nextWorker = nextWorker;
+        this.modelName = modelName;
     }
 
     @Override public final void in(Indicator indicator) {
+        if (modelName.equals("all_heatmap")) {
+            AllHeatmapIndicator allHeatmapIndicator = (AllHeatmapIndicator)indicator;
+            logger.error("remote indicator: {}", allHeatmapIndicator.getDetailGroup().toStorageData());
+        }
+
         try {
             remoteSender.send(nextWorker.getWorkerId(), indicator, Selector.HashCode);
         } catch (Throwable e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index d559ef82c..b9e67f05c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -41,7 +41,7 @@
         this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
     }
 
-    final void onWork(INPUT input) {
+    void onWork(INPUT input) {
         if (getCache().currentCollectionSize() >= batchSize) {
             try {
                 if (getCache().trySwitchPointer()) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java
index f9a0b78b7..85a9c73c5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java
@@ -52,6 +52,7 @@ private ServiceInventoryCache getServiceInventoryCache() {
 
     @Override public int getOrCreate(String serviceName) {
         int serviceId = getServiceInventoryCache().getServiceId(serviceName);
+        logger.info("register service: {}, service id: {}", serviceName, serviceId);
 
         if (serviceId == Const.NONE) {
             ServiceInventory serviceInventory = new ServiceInventory();
@@ -62,6 +63,7 @@ private ServiceInventoryCache getServiceInventoryCache() {
             long now = System.currentTimeMillis();
             serviceInventory.setRegisterTime(now);
             serviceInventory.setHeartbeatTime(now);
+            serviceInventory.setMappingServiceId(Const.NONE);
             serviceInventory.setMappingLastUpdateTime(now);
 
             InventoryProcess.INSTANCE.in(serviceInventory);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index 4a677cecf..d5821f678 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -22,7 +22,7 @@
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
-import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.oap.server.core.register.*;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.slf4j.*;
 
@@ -54,6 +54,10 @@
     private void onWork(RegisterSource source) {
         messageNum++;
 
+        if (source instanceof ServiceInventory) {
+            logger.info("service register distinct, name {}", ((ServiceInventory)source).getName());
+        }
+
         if (!sources.containsKey(source)) {
             sources.put(source, source);
         } else {
@@ -61,7 +65,12 @@ private void onWork(RegisterSource source) {
         }
 
         if (messageNum >= 1000 || source.getEndOfBatchContext().isEndOfBatch()) {
-            sources.values().forEach(nextWorker::in);
+            sources.values().forEach(source1 -> {
+                if (source instanceof ServiceInventory) {
+                    logger.info("register distinct send to next, name: {}", ((ServiceInventory)source).getName());
+                }
+                nextWorker.in(source1);
+            });
             messageNum = 0;
         }
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index cfed8c306..cb8e61551 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -19,7 +19,10 @@
 package org.apache.skywalking.oap.server.core.register.worker;
 
 import java.util.*;
-import org.apache.skywalking.oap.server.core.register.RegisterSource;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
+import org.apache.skywalking.oap.server.core.register.*;
 import org.apache.skywalking.oap.server.core.source.Scope;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -38,6 +41,7 @@
     private final Map<RegisterSource, RegisterSource> sources;
     private final IRegisterLockDAO registerLockDAO;
     private final IRegisterDAO registerDAO;
+    private final DataCarrier<RegisterSource> dataCarrier;
 
     RegisterPersistentWorker(int workerId, String modelName, ModuleManager moduleManager,
         IRegisterDAO registerDAO, Scope scope) {
@@ -47,9 +51,20 @@
         this.registerDAO = registerDAO;
         this.registerLockDAO = moduleManager.find(StorageModule.NAME).getService(IRegisterLockDAO.class);
         this.scope = scope;
+        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
+        this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1);
     }
 
     @Override public final void in(RegisterSource registerSource) {
+        registerSource.setEndOfBatchContext(new EndOfBatchContext(false));
+        dataCarrier.produce(registerSource);
+    }
+
+    private void onWork(RegisterSource registerSource) {
+        if (registerSource instanceof ServiceInventory) {
+            logger.info("service register persistent, name {}", ((ServiceInventory)registerSource).getName());
+        }
+
         if (!sources.containsKey(registerSource)) {
             sources.put(registerSource, registerSource);
         }
@@ -76,7 +91,43 @@
                 } finally {
                     registerLockDAO.releaseLock(scope);
                 }
+            } else {
+                logger.info("Inventory register try lock failure.");
             }
         }
     }
+
+    private class PersistentConsumer implements IConsumer<RegisterSource> {
+
+        private final RegisterPersistentWorker persistent;
+
+        private PersistentConsumer(RegisterPersistentWorker persistent) {
+            this.persistent = persistent;
+        }
+
+        @Override public void init() {
+
+        }
+
+        @Override public void consume(List<RegisterSource> data) {
+            Iterator<RegisterSource> sourceIterator = data.iterator();
+
+            int i = 0;
+            while (sourceIterator.hasNext()) {
+                RegisterSource indicator = sourceIterator.next();
+                i++;
+                if (i == data.size()) {
+                    indicator.getEndOfBatchContext().setEndOfBatch(true);
+                }
+                persistent.onWork(indicator);
+            }
+        }
+
+        @Override public void onError(List<RegisterSource> data, Throwable t) {
+            logger.error(t.getMessage(), t);
+        }
+
+        @Override public void onExit() {
+        }
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
index 0e9ff2fe7..c00a7755d 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -68,5 +68,7 @@ public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSi
                 }
             });
         }
+
+        this.bulkProcessor.flush();
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services