You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/15 02:47:50 UTC

[incubator-skywalking] branch new-consumer-pool updated: Alter thread model.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch new-consumer-pool
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/new-consumer-pool by this push:
     new a28434f  Alter thread model.
a28434f is described below

commit a28434fb3bf318362d1e537d77f4faa0c0042ce6
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Feb 15 10:47:42 2019 +0800

    Alter thread model.
---
 .../commons/datacarrier/consumer/BulkConsumePool.java   |  4 ++--
 docker/config/application.yml                           |  2 +-
 .../en/setup/service-agent/java-agent/Supported-list.md |  2 +-
 .../core/analysis/worker/RecordPersistentWorker.java    | 14 ++++++++++++--
 .../core/register/worker/RegisterDistinctWorker.java    | 14 +++++++++++---
 .../core/register/worker/RegisterPersistentWorker.java  | 17 +++++++++++++----
 .../server-starter/src/main/assembly/application.yml    |  2 +-
 .../server-starter/src/main/resources/application.yml   |  2 +-
 8 files changed, 42 insertions(+), 15 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index ba5ace8..798a601 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -104,9 +104,9 @@ public class BulkConsumePool implements ConsumerPool {
         private int size;
         private long consumeCycle;
 
-        public Creator(String name, int size, long consumeCycle) {
+        public Creator(String name, int poolSize, long consumeCycle) {
             this.name = name;
-            this.size = size;
+            this.size = poolSize;
             this.consumeCycle = consumeCycle;
         }
 
diff --git a/docker/config/application.yml b/docker/config/application.yml
index f9b645f..d99357a 100644
--- a/docker/config/application.yml
+++ b/docker/config/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
-    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 service-mesh:
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 2c04fe6..5851f30 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -62,6 +62,6 @@
   * [GSON](https://github.com/google/gson) 2.8.x (Optional²)
 
 ¹Due to license incompatibilities/restrictions these plugins are hosted and released in 3rd part repository, 
- go to [OpenSkywalking java plugin extension repository](https://github.com/OpenSkywalking/java-plugin-extensions) to get these.
+ go to [SkyAPM java plugin extension repository](https://github.com/SkyAPM/java-plugin-extensions) to get these.
 
 ²These plugins affect the performance or must be used under some conditions, from experiences. So only released in `/optional-plugins`, copy to `/plugins` in order to make them work.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 8d291d7..a6b972e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -20,7 +20,8 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
@@ -45,8 +46,17 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa
         this.modelName = modelName;
         this.nonMergeDataCache = new NonMergeDataCache<>();
         this.recordDAO = recordDAO;
+
+        String name = "RECORD_PERSISTENT";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+
         this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(new RecordPersistentWorker.PersistentConsumer(this), 1);
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RecordPersistentWorker.PersistentConsumer(this));
     }
 
     @Override public void in(Record record) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index 443eb73..ae4bc67 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -20,7 +20,8 @@ package org.apache.skywalking.oap.server.core.register.worker;
 
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -42,8 +43,15 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
         super(workerId);
         this.nextWorker = nextWorker;
         this.sources = new HashMap<>();
-        this.dataCarrier = new DataCarrier<>(1, 10000);
-        this.dataCarrier.consume(new AggregatorConsumer(this), 1, 200);
+        this.dataCarrier = new DataCarrier<>(1, 1000);
+        String name = "REGISTER_L1";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
     }
 
     @Override public final void in(RegisterSource source) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 35a83b4..1ccd21d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.core.register.worker;
 
 import java.util.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.*;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.source.Scope;
@@ -52,8 +52,17 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
         this.registerDAO = registerDAO;
         this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
         this.scope = scope;
-        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
-        this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1, 200);
+        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
+
+        String name = "REGISTER_L2";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RegisterPersistentWorker.PersistentConsumer(this));
     }
 
     @Override public final void in(RegisterSource registerSource) {
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index 840777d..b8c0efc 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
-    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 #service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 28c7b97..24df981 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
     bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
     bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
     sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
-    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+    slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
 receiver-jvm:
   default:
 service-mesh: