You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/21 14:11:32 UTC
[skywalking] 01/01: Fix DataCarrier's
`org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer`
implementation isn't activated in `IF_POSSIBLE` mode.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch datacarrier-bug
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 09c0e8e32d643eb8a4189225925389ab0e483252
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Dec 21 22:08:10 2020 +0800
Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.
---
CHANGES.md | 1 +
.../apm/commons/datacarrier/DataCarrier.java | 18 +++++++++---------
.../apm/commons/datacarrier/DataCarrierTest.java | 8 +++-----
.../agent/core/remote/TraceSegmentServiceClient.java | 3 +--
.../core/kafka/KafkaTraceSegmentServiceClient.java | 3 +--
5 files changed, 15 insertions(+), 18 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 77c01f7..43731e9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,7 @@ Release Notes.
* Update `byte-buddy` to 1.10.19.
* Fix thrift plugin trace link broken when intermediate service does not mount agent
* Fix thrift plugin collects wrong args when the method without parameter.
+* Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.
#### OAP-Backend
* Make meter receiver support MAL.
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 3f34846..66d3ff8 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -44,10 +44,18 @@ public class DataCarrier<T> {
}
public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) {
+ this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING);
+ }
+
+ public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) {
this.name = name;
bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
- channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
+ channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy);
+ }
+
+ public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) {
+ this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy);
}
/**
@@ -63,14 +71,6 @@ public class DataCarrier<T> {
}
/**
- * override the strategy at runtime. Notice, {@link Channels} will override several channels one by one.
- */
- public DataCarrier setBufferStrategy(BufferStrategy strategy) {
- this.channels.setStrategy(strategy);
- return this;
- }
-
- /**
* produce data to buffer, using the given {@link BufferStrategy}.
*
* @return false means produce data failure. The data will not be consumed.
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
index a67475b..ae496a0 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
@@ -33,7 +33,7 @@ import org.powermock.api.support.membermodification.MemberModifier;
public class DataCarrierTest {
@Test
public void testCreateDataCarrier() throws IllegalAccessException {
- DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100);
+ DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100, BufferStrategy.IF_POSSIBLE);
Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels")
.get(carrier));
@@ -42,8 +42,7 @@ public class DataCarrierTest {
QueueBuffer<SampleData> buffer = channels.getBuffer(0);
Assert.assertEquals(100, buffer.getBufferSize());
- Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING);
- carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+ Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy")
.get(buffer), BufferStrategy.IF_POSSIBLE);
@@ -81,8 +80,7 @@ public class DataCarrierTest {
@Test
public void testIfPossibleProduce() throws IllegalAccessException {
- DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
- carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+ DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100, BufferStrategy.IF_POSSIBLE);
for (int i = 0; i < 200; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 7ebd366..c9583cc 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -64,8 +64,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
lastLogTime = System.currentTimeMillis();
segmentUplinkedCounter = 0;
segmentAbandonedCounter = 0;
- carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
- carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+ carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
}
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
index 36572a8..c10ebd6 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
@@ -59,8 +59,7 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr
@Override
public void boot() {
- carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
- carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+ carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();