You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/21 23:54:35 UTC

[skywalking] branch master updated: Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode. (#6048)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new e6265ba  Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode. (#6048)
e6265ba is described below

commit e6265ba5be988c9b4c47252794a3684d544584d0
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Dec 22 07:54:15 2020 +0800

    Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode. (#6048)
---
 CHANGES.md                                             |  1 +
 .../apm/commons/datacarrier/DataCarrier.java           | 18 +++++++++---------
 .../apm/commons/datacarrier/DataCarrierTest.java       |  8 +++-----
 .../agent/core/remote/TraceSegmentServiceClient.java   |  3 +--
 .../core/kafka/KafkaTraceSegmentServiceClient.java     |  3 +--
 5 files changed, 15 insertions(+), 18 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 77c01f7..43731e9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,7 @@ Release Notes.
 * Update `byte-buddy` to 1.10.19.
 * Fix thrift plugin trace link broken when intermediate service does not mount agent
 * Fix thrift plugin collects wrong args when the method without parameter.
+* Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.
 
 #### OAP-Backend
 * Make meter receiver support MAL.
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 3f34846..66d3ff8 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -44,10 +44,18 @@ public class DataCarrier<T> {
     }
 
     public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) {
+        this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING);
+    }
+
+    public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) {
         this.name = name;
         bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
         channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
-        channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
+        channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy);
+    }
+
+    public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) {
+        this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy);
     }
 
     /**
@@ -63,14 +71,6 @@ public class DataCarrier<T> {
     }
 
     /**
-     * override the strategy at runtime. Notice, {@link Channels} will override several channels one by one.
-     */
-    public DataCarrier setBufferStrategy(BufferStrategy strategy) {
-        this.channels.setStrategy(strategy);
-        return this;
-    }
-
-    /**
      * produce data to buffer, using the given {@link BufferStrategy}.
      *
      * @return false means produce data failure. The data will not be consumed.
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
index a67475b..ae496a0 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
@@ -33,7 +33,7 @@ import org.powermock.api.support.membermodification.MemberModifier;
 public class DataCarrierTest {
     @Test
     public void testCreateDataCarrier() throws IllegalAccessException {
-        DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100);
+        DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100, BufferStrategy.IF_POSSIBLE);
 
         Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels")
                                                                               .get(carrier));
@@ -42,8 +42,7 @@ public class DataCarrierTest {
         QueueBuffer<SampleData> buffer = channels.getBuffer(0);
         Assert.assertEquals(100, buffer.getBufferSize());
 
-        Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING);
-        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
         Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy")
                                           .get(buffer), BufferStrategy.IF_POSSIBLE);
 
@@ -81,8 +80,7 @@ public class DataCarrierTest {
 
     @Test
     public void testIfPossibleProduce() throws IllegalAccessException {
-        DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
-        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100, BufferStrategy.IF_POSSIBLE);
 
         for (int i = 0; i < 200; i++) {
             Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 7ebd366..c9583cc 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -64,8 +64,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
         lastLogTime = System.currentTimeMillis();
         segmentUplinkedCounter = 0;
         segmentAbandonedCounter = 0;
-        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
-        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
         carrier.consume(this, 1);
     }
 
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
index 36572a8..c10ebd6 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
@@ -59,8 +59,7 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr
 
     @Override
     public void boot() {
-        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
-        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
         carrier.consume(this, 1);
 
         producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();