You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/15 02:47:50 UTC
[incubator-skywalking] branch new-consumer-pool updated: Alter
thread model.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch new-consumer-pool
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/new-consumer-pool by this push:
new a28434f Alter thread model.
a28434f is described below
commit a28434fb3bf318362d1e537d77f4faa0c0042ce6
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Feb 15 10:47:42 2019 +0800
Alter thread model.
---
.../commons/datacarrier/consumer/BulkConsumePool.java | 4 ++--
docker/config/application.yml | 2 +-
.../en/setup/service-agent/java-agent/Supported-list.md | 2 +-
.../core/analysis/worker/RecordPersistentWorker.java | 14 ++++++++++++--
.../core/register/worker/RegisterDistinctWorker.java | 14 +++++++++++---
.../core/register/worker/RegisterPersistentWorker.java | 17 +++++++++++++----
.../server-starter/src/main/assembly/application.yml | 2 +-
.../server-starter/src/main/resources/application.yml | 2 +-
8 files changed, 42 insertions(+), 15 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index ba5ace8..798a601 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -104,9 +104,9 @@ public class BulkConsumePool implements ConsumerPool {
private int size;
private long consumeCycle;
- public Creator(String name, int size, long consumeCycle) {
+ public Creator(String name, int poolSize, long consumeCycle) {
this.name = name;
- this.size = size;
+ this.size = poolSize;
this.consumeCycle = consumeCycle;
}
diff --git a/docker/config/application.yml b/docker/config/application.yml
index f9b645f..d99357a 100644
--- a/docker/config/application.yml
+++ b/docker/config/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
- slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
service-mesh:
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 2c04fe6..5851f30 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -62,6 +62,6 @@
* [GSON](https://github.com/google/gson) 2.8.x (Optional²)
¹Due to license incompatibilities/restrictions these plugins are hosted and released in 3rd part repository,
- go to [OpenSkywalking java plugin extension repository](https://github.com/OpenSkywalking/java-plugin-extensions) to get these.
+ go to [SkyAPM java plugin extension repository](https://github.com/SkyAPM/java-plugin-extensions) to get these.
²These plugins affect the performance or must be used under some conditions, from experiences. So only released in `/optional-plugins`, copy to `/plugins` in order to make them work.
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
index 8d291d7..a6b972e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -20,7 +20,8 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
@@ -45,8 +46,17 @@ public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDa
this.modelName = modelName;
this.nonMergeDataCache = new NonMergeDataCache<>();
this.recordDAO = recordDAO;
+
+ String name = "RECORD_PERSISTENT";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+
this.dataCarrier = new DataCarrier<>(1, 10000);
- this.dataCarrier.consume(new RecordPersistentWorker.PersistentConsumer(this), 1);
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RecordPersistentWorker.PersistentConsumer(this));
}
@Override public void in(Record record) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index 443eb73..ae4bc67 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -20,7 +20,8 @@ package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
@@ -42,8 +43,15 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
super(workerId);
this.nextWorker = nextWorker;
this.sources = new HashMap<>();
- this.dataCarrier = new DataCarrier<>(1, 10000);
- this.dataCarrier.consume(new AggregatorConsumer(this), 1, 200);
+ this.dataCarrier = new DataCarrier<>(1, 1000);
+ String name = "REGISTER_L1";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
}
@Override public final void in(RegisterSource source) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 35a83b4..1ccd21d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -20,8 +20,8 @@ package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
@@ -52,8 +52,17 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
this.registerDAO = registerDAO;
this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
this.scope = scope;
- this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1, 200);
+ this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
+
+ String name = "REGISTER_L2";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RegisterPersistentWorker.PersistentConsumer(this));
}
@Override public final void in(RegisterSource registerSource) {
diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml
index 840777d..b8c0efc 100644
--- a/oap-server/server-starter/src/main/assembly/application.yml
+++ b/oap-server/server-starter/src/main/assembly/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
- slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
#service-mesh:
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 28c7b97..24df981 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -75,7 +75,7 @@ receiver-trace:
bufferDataMaxFileSize: ${SW_RECEIVER_BUFFER_DATA_MAX_FILE_SIZE:500} # Unit is MB
bufferFileCleanWhenRestart: ${SW_RECEIVER_BUFFER_FILE_CLEAN_WHEN_RESTART:false}
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
- slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,redis:50} # The slow database access thresholds. Unit ms.
+ slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-jvm:
default:
service-mesh: