You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/12/21 14:11:31 UTC

[skywalking] branch datacarrier-bug created (now 09c0e8e)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch datacarrier-bug
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at 09c0e8e  Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.

This branch includes the following new commits:

     new 09c0e8e  Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch datacarrier-bug
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 09c0e8e32d643eb8a4189225925389ab0e483252
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Dec 21 22:08:10 2020 +0800

    Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.
---
 CHANGES.md                                             |  1 +
 .../apm/commons/datacarrier/DataCarrier.java           | 18 +++++++++---------
 .../apm/commons/datacarrier/DataCarrierTest.java       |  8 +++-----
 .../agent/core/remote/TraceSegmentServiceClient.java   |  3 +--
 .../core/kafka/KafkaTraceSegmentServiceClient.java     |  3 +--
 5 files changed, 15 insertions(+), 18 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 77c01f7..43731e9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,7 @@ Release Notes.
 * Update `byte-buddy` to 1.10.19.
 * Fix thrift plugin trace link broken when intermediate service does not mount agent
 * Fix thrift plugin collects wrong args when the method without parameter.
+* Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.
 
 #### OAP-Backend
 * Make meter receiver support MAL.
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 3f34846..66d3ff8 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -44,10 +44,18 @@ public class DataCarrier<T> {
     }
 
     public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) {
+        this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING);
+    }
+
+    public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) {
         this.name = name;
         bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
         channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
-        channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
+        channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy);
+    }
+
+    public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) {
+        this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy);
     }
 
     /**
@@ -63,14 +71,6 @@ public class DataCarrier<T> {
     }
 
     /**
-     * override the strategy at runtime. Notice, {@link Channels} will override several channels one by one.
-     */
-    public DataCarrier setBufferStrategy(BufferStrategy strategy) {
-        this.channels.setStrategy(strategy);
-        return this;
-    }
-
-    /**
      * produce data to buffer, using the given {@link BufferStrategy}.
      *
      * @return false means produce data failure. The data will not be consumed.
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
index a67475b..ae496a0 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
@@ -33,7 +33,7 @@ import org.powermock.api.support.membermodification.MemberModifier;
 public class DataCarrierTest {
     @Test
     public void testCreateDataCarrier() throws IllegalAccessException {
-        DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100);
+        DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100, BufferStrategy.IF_POSSIBLE);
 
         Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels")
                                                                               .get(carrier));
@@ -42,8 +42,7 @@ public class DataCarrierTest {
         QueueBuffer<SampleData> buffer = channels.getBuffer(0);
         Assert.assertEquals(100, buffer.getBufferSize());
 
-        Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING);
-        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
         Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy")
                                           .get(buffer), BufferStrategy.IF_POSSIBLE);
 
@@ -81,8 +80,7 @@ public class DataCarrierTest {
 
     @Test
     public void testIfPossibleProduce() throws IllegalAccessException {
-        DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
-        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100, BufferStrategy.IF_POSSIBLE);
 
         for (int i = 0; i < 200; i++) {
             Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 7ebd366..c9583cc 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -64,8 +64,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
         lastLogTime = System.currentTimeMillis();
         segmentUplinkedCounter = 0;
         segmentAbandonedCounter = 0;
-        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
-        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
         carrier.consume(this, 1);
     }
 
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
index 36572a8..c10ebd6 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaTraceSegmentServiceClient.java
@@ -59,8 +59,7 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr
 
     @Override
     public void boot() {
-        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
-        carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
+        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
         carrier.consume(this, 1);
 
         producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();