You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/11/13 09:56:03 UTC

[skywalking] 01/01: Refactor DataCarrier, support ArrayBlockingQueueBuffer as the implementation for blocking queue buffer.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch blocking-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit b1a9bdb448551344aa97c4b0a94390e0d02bd305
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Nov 13 17:55:11 2019 +0800

    Refactor DataCarrier, support ArrayBlockingQueueBuffer as the implementation for blocking queue buffer.
---
 .../commons/datacarrier/BlockingDataCarrier.java   |  4 -
 .../buffer/ArrayBlockingQueueBuffer.java           | 69 ++++++++++++++++
 .../apm/commons/datacarrier/buffer/Buffer.java     | 30 ++-----
 .../commons/datacarrier/buffer/BufferStrategy.java |  1 -
 .../apm/commons/datacarrier/buffer/Channels.java   | 25 +++---
 .../QueueBuffer.java}                              | 31 ++++++--
 .../datacarrier/consumer/ConsumeDriver.java        | 54 ++++---------
 .../datacarrier/consumer/ConsumerThread.java       | 29 ++-----
 .../consumer/MultipleChannelsConsumer.java         |  3 +-
 .../apm/commons/datacarrier/DataCarrierTest.java   | 46 ++++-------
 .../datacarrier/consumer/BulkConsumePoolTest.java  | 91 ----------------------
 .../commons/datacarrier/consumer/ConsumerTest.java |  2 +-
 .../core/remote/client/GRPCRemoteClient.java       |  1 -
 13 files changed, 152 insertions(+), 234 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
index 8b97080..399cc8e 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.apm.commons.datacarrier;
 
 import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
 
 /**
  * @author wu-sheng
@@ -31,7 +30,4 @@ public class BlockingDataCarrier<T> {
         this.channels = channels;
     }
 
-    public void addCallback(QueueBlockingCallback<T> callback) {
-        this.channels.addCallback(callback);
-    }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
new file mode 100644
index 0000000..421a1db
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.buffer;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * The buffer implementation based on JDK ArrayBlockingQueue.
+ *
+ * This implementation has better performance in server side. We are still trying to research whether this is suitable
+ * for agent side, which is more sensitive about blocks.
+ *
+ * @author wusheng
+ */
+public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
+    private BufferStrategy strategy;
+    private ArrayBlockingQueue<T> queue;
+    private int bufferSize;
+
+    ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
+        this.strategy = strategy;
+        this.queue = new ArrayBlockingQueue<T>(bufferSize);
+        this.bufferSize = bufferSize;
+    }
+
+    @Override public boolean save(T data) {
+        switch (strategy){
+            case IF_POSSIBLE:
+                return queue.offer(data);
+            default:
+                try {
+                    queue.put(data);
+                } catch (InterruptedException e) {
+                    // Ignore the error
+                    return false;
+                }
+        }
+        return true;
+    }
+
+    @Override public void setStrategy(BufferStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override public void obtain(List<T> consumeList) {
+        queue.drainTo(consumeList);
+    }
+
+    @Override public int getBufferSize() {
+        return bufferSize;
+    }
+}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
index d869832..b4419a7 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
@@ -18,49 +18,36 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.buffer;
 
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
-import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
-
-import java.util.LinkedList;
 import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
 
 /**
- * Created by wusheng on 2016/10/25.
+ * Self implementation ring queue.
+ *
+ * @author wusheng
  */
-public class Buffer<T> {
+public class Buffer<T> implements QueueBuffer<T> {
     private final Object[] buffer;
     private BufferStrategy strategy;
     private AtomicRangeInteger index;
-    private List<QueueBlockingCallback<T>> callbacks;
 
     Buffer(int bufferSize, BufferStrategy strategy) {
         buffer = new Object[bufferSize];
         this.strategy = strategy;
         index = new AtomicRangeInteger(0, bufferSize);
-        callbacks = new LinkedList<QueueBlockingCallback<T>>();
     }
 
-    void setStrategy(BufferStrategy strategy) {
+    public void setStrategy(BufferStrategy strategy) {
         this.strategy = strategy;
     }
 
-    void addCallback(QueueBlockingCallback<T> callback) {
-        callbacks.add(callback);
-    }
 
-    boolean save(T data) {
+    public boolean save(T data) {
         int i = index.getAndIncrement();
         if (buffer[i] != null) {
             switch (strategy) {
                 case BLOCKING:
-                    boolean isFirstTimeBlocking = true;
                     while (buffer[i] != null) {
-                        if (isFirstTimeBlocking) {
-                            isFirstTimeBlocking = false;
-                            for (QueueBlockingCallback<T> callback : callbacks) {
-                                callback.notify(data);
-                            }
-                        }
                         try {
                             Thread.sleep(1L);
                         } catch (InterruptedException e) {
@@ -69,7 +56,6 @@ public class Buffer<T> {
                     break;
                 case IF_POSSIBLE:
                     return false;
-                case OVERRIDE:
                 default:
             }
         }
@@ -85,7 +71,7 @@ public class Buffer<T> {
         this.obtain(consumeList, 0, buffer.length);
     }
 
-    public void obtain(List<T> consumeList, int start, int end) {
+    void obtain(List<T> consumeList, int start, int end) {
         for (int i = start; i < end; i++) {
             if (buffer[i] != null) {
                 consumeList.add((T)buffer[i]);
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
index fdad2e8..a26a324 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
@@ -24,6 +24,5 @@ package org.apache.skywalking.apm.commons.datacarrier.buffer;
  */
 public enum BufferStrategy {
     BLOCKING,
-    OVERRIDE,
     IF_POSSIBLE
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
index af8cefc..1f13cc2 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
@@ -18,15 +18,14 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.buffer;
 
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
 import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
 
 /**
- * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when buffer
- * is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
+ * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when
+ * buffer is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
  */
 public class Channels<T> {
-    private final Buffer<T>[] bufferChannels;
+    private final QueueBuffer<T>[] bufferChannels;
     private IDataPartitioner<T> dataPartitioner;
     private BufferStrategy strategy;
     private final long size;
@@ -34,9 +33,13 @@ public class Channels<T> {
     public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
         this.dataPartitioner = partitioner;
         this.strategy = strategy;
-        bufferChannels = new Buffer[channelSize];
+        bufferChannels = new QueueBuffer[channelSize];
         for (int i = 0; i < channelSize; i++) {
-            bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
+            if (BufferStrategy.BLOCKING.equals(strategy)) {
+                bufferChannels[i] = new ArrayBlockingQueueBuffer<T>(bufferSize, strategy);
+            } else {
+                bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
+            }
         }
         size = channelSize * bufferSize;
     }
@@ -69,7 +72,7 @@ public class Channels<T> {
      * @param strategy
      */
     public void setStrategy(BufferStrategy strategy) {
-        for (Buffer<T> buffer : bufferChannels) {
+        for (QueueBuffer<T> buffer : bufferChannels) {
             buffer.setStrategy(strategy);
         }
     }
@@ -87,13 +90,7 @@ public class Channels<T> {
         return size;
     }
 
-    public Buffer<T> getBuffer(int index) {
+    public QueueBuffer<T> getBuffer(int index) {
         return this.bufferChannels[index];
     }
-
-    public void addCallback(QueueBlockingCallback<T> callback) {
-        for (Buffer<T> channel : bufferChannels) {
-            channel.addCallback(callback);
-        }
-    }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
similarity index 58%
rename from apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java
rename to apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
index c0a6c47..5789919 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
@@ -16,13 +16,34 @@
  *
  */
 
-package org.apache.skywalking.apm.commons.datacarrier.callback;
+package org.apache.skywalking.apm.commons.datacarrier.buffer;
+
+import java.util.List;
 
 /**
- * Notify when the queue, which is in blocking strategy, has be blocked.
+ * Queue buffer interface.
  *
- * @author wu-sheng
+ * @author wusheng
  */
-public interface QueueBlockingCallback<T> {
-    void notify(T message);
+public interface QueueBuffer<T> {
+    /**
+     * Save data into the queue;
+     * @param data to add.
+     * @return true if saved
+     */
+    boolean save(T data);
+
+    /**
+     * Set different strategy when queue is full.
+     * @param strategy
+     */
+    void setStrategy(BufferStrategy strategy);
+
+    /**
+     * Obtain the existing data from the queue
+     * @param consumeList
+     */
+    void obtain(List<T> consumeList);
+
+    int getBufferSize();
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
index 3dcc26b..fcf06ed 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
@@ -18,9 +18,8 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.consumer;
 
-import java.util.ArrayList;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
 
 /**
  * Pool of consumers <p> Created by wusheng on 2016/10/25.
@@ -93,44 +92,19 @@ public class ConsumeDriver<T> implements IDriver {
 
     private void allocateBuffer2Thread() {
         int channelSize = this.channels.getChannelSize();
-        if (channelSize < consumerThreads.length) {
-            /**
-             * if consumerThreads.length > channelSize
-             * each channel will be process by several consumers.
-             */
-            ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
-            for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
-                int index = threadIndex % channelSize;
-                if (threadAllocation[index] == null) {
-                    threadAllocation[index] = new ArrayList<Integer>();
-                }
-                threadAllocation[index].add(threadIndex);
-            }
-
-            for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
-                ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
-                Buffer<T> channel = this.channels.getBuffer(channelIndex);
-                int bufferSize = channel.getBufferSize();
-                int step = bufferSize / threadAllocationPerChannel.size();
-                for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
-                    int threadIndex = threadAllocationPerChannel.get(i);
-                    int start = i * step;
-                    int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
-                    consumerThreads[threadIndex].addDataSource(channel, start, end);
-                }
-            }
-        } else {
-            /**
-             * if consumerThreads.length < channelSize
-             * each consumer will process several channels.
-             *
-             * if consumerThreads.length == channelSize
-             * each consumer will process one channel.
-             */
-            for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
-                int consumerIndex = channelIndex % consumerThreads.length;
-                consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
-            }
+        /**
+         * if consumerThreads.length < channelSize
+         * each consumer will process several channels.
+         *
+         * if consumerThreads.length == channelSize
+         * each consumer will process one channel.
+         *
+         * if consumerThreads.length > channelSize
+         * there will be some threads do nothing.
+         */
+        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
+            int consumerIndex = channelIndex % consumerThreads.length;
+            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
         }
 
     }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
index 6a15c1c..15c01f5 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
@@ -19,10 +19,10 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.consumer;
 
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
-
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
 
 /**
  * Created by wusheng on 2016/10/25.
@@ -42,23 +42,12 @@ public class ConsumerThread<T> extends Thread {
     }
 
     /**
-     * add partition of buffer to consume
-     *
-     * @param sourceBuffer
-     * @param start
-     * @param end
-     */
-    void addDataSource(Buffer<T> sourceBuffer, int start, int end) {
-        this.dataSources.add(new DataSource(sourceBuffer, start, end));
-    }
-
-    /**
      * add whole buffer to consume
      *
      * @param sourceBuffer
      */
-    void addDataSource(Buffer<T> sourceBuffer) {
-        this.dataSources.add(new DataSource(sourceBuffer, 0, sourceBuffer.getBufferSize()));
+    void addDataSource(QueueBuffer<T> sourceBuffer) {
+        this.dataSources.add(new DataSource(sourceBuffer));
     }
 
     @Override
@@ -108,18 +97,14 @@ public class ConsumerThread<T> extends Thread {
      * DataSource is a refer to {@link Buffer}.
      */
     class DataSource {
-        private Buffer<T> sourceBuffer;
-        private int start;
-        private int end;
+        private QueueBuffer<T> sourceBuffer;
 
-        DataSource(Buffer<T> sourceBuffer, int start, int end) {
+        DataSource(QueueBuffer<T> sourceBuffer) {
             this.sourceBuffer = sourceBuffer;
-            this.start = start;
-            this.end = end;
         }
 
         void obtain(List<T> consumeList) {
-            sourceBuffer.obtain(consumeList, start, end);
+            sourceBuffer.obtain(consumeList);
         }
     }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index 4963788..cca11c5 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
 
 /**
  * MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
@@ -73,7 +74,7 @@ public class MultipleChannelsConsumer extends Thread {
 
     private boolean consume(Group target, List consumeList) {
         for (int i = 0; i < target.channels.getChannelSize(); i++) {
-            Buffer buffer = target.channels.getBuffer(i);
+            QueueBuffer buffer = target.channels.getBuffer(i);
             buffer.obtain(consumeList);
         }
 
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
index 845b726..a3d9120 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
 import org.apache.skywalking.apm.commons.datacarrier.partition.ProducerThreadPartitioner;
 import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
 import org.junit.Assert;
@@ -42,14 +43,14 @@ public class DataCarrierTest {
         Assert.assertEquals(((Integer)(MemberModifier.field(DataCarrier.class, "channelSize").get(carrier))).intValue(), 5);
 
         Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
-        Assert.assertEquals(channels.getChannelSize(), 5);
+        Assert.assertEquals(5, channels.getChannelSize());
 
-        Buffer<SampleData> buffer = channels.getBuffer(0);
-        Assert.assertEquals(buffer.getBufferSize(), 100);
+        QueueBuffer<SampleData> buffer = channels.getBuffer(0);
+        Assert.assertEquals(100, buffer.getBufferSize());
 
-        Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.BLOCKING);
+        Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING);
         carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
-        Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
+        Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
 
         Assert.assertEquals(MemberModifier.field(Channels.class, "dataPartitioner").get(channels).getClass(), SimpleRollingPartitioner.class);
         carrier.setPartitioner(new ProducerThreadPartitioner<SampleData>());
@@ -65,39 +66,20 @@ public class DataCarrierTest {
         Assert.assertTrue(carrier.produce(new SampleData().setName("d")));
 
         Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
-        Buffer<SampleData> buffer1 = channels.getBuffer(0);
+        QueueBuffer<SampleData> buffer1 = channels.getBuffer(0);
 
         List result = new ArrayList();
-        buffer1.obtain(result, 0, 100);
+        buffer1.obtain(result);
         Assert.assertEquals(2, result.size());
 
-        Buffer<SampleData> buffer2 = channels.getBuffer(1);
-        buffer2.obtain(result, 0, 100);
+        QueueBuffer<SampleData> buffer2 = channels.getBuffer(1);
+        buffer2.obtain(result);
 
         Assert.assertEquals(4, result.size());
 
     }
 
     @Test
-    public void testOverrideProduce() throws IllegalAccessException {
-        DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
-        carrier.setBufferStrategy(BufferStrategy.OVERRIDE);
-
-        for (int i = 0; i < 500; i++) {
-            Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
-        }
-
-        Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
-        Buffer<SampleData> buffer1 = channels.getBuffer(0);
-        List result = new ArrayList();
-        buffer1.obtain(result, 0, 100);
-
-        Buffer<SampleData> buffer2 = channels.getBuffer(1);
-        buffer2.obtain(result, 0, 100);
-        Assert.assertEquals(200, result.size());
-    }
-
-    @Test
     public void testIfPossibleProduce() throws IllegalAccessException {
         DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
         carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
@@ -111,12 +93,12 @@ public class DataCarrierTest {
         }
 
         Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
-        Buffer<SampleData> buffer1 = channels.getBuffer(0);
+        QueueBuffer<SampleData> buffer1 = channels.getBuffer(0);
         List result = new ArrayList();
-        buffer1.obtain(result, 0, 100);
+        buffer1.obtain(result);
 
-        Buffer<SampleData> buffer2 = channels.getBuffer(1);
-        buffer2.obtain(result, 0, 100);
+        QueueBuffer<SampleData> buffer2 = channels.getBuffer(1);
+        buffer2.obtain(result);
         Assert.assertEquals(200, result.size());
     }
 
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
deleted file mode 100644
index 9cd9c8c..0000000
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.commons.datacarrier.consumer;
-
-import java.util.*;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
-import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
-import org.junit.*;
-
-/**
- * @author wusheng
- */
-public class BulkConsumePoolTest {
-    @Test
-    public void testOneThreadPool() throws InterruptedException {
-        BulkConsumePool pool = new BulkConsumePool("testPool", 1, 50);
-        final ArrayList<Object> result1 = new ArrayList();
-        Channels c1 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
-        pool.add("test", c1,
-            new IConsumer() {
-                @Override public void init() {
-
-                }
-
-                @Override public void consume(List data) {
-                    for (Object datum : data) {
-                        result1.add(datum);
-                    }
-                }
-
-                @Override public void onError(List data, Throwable t) {
-
-                }
-
-                @Override public void onExit() {
-
-                }
-            });
-        pool.begin(c1);
-        final ArrayList<Object> result2 = new ArrayList();
-        Channels c2 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
-        pool.add("test2", c2,
-            new IConsumer() {
-                @Override public void init() {
-
-                }
-
-                @Override public void consume(List data) {
-                    for (Object datum : data) {
-                        result2.add(datum);
-                    }
-                }
-
-                @Override public void onError(List data, Throwable t) {
-
-                }
-
-                @Override public void onExit() {
-
-                }
-            });
-        pool.begin(c2);
-        c1.save(new Object());
-        c1.save(new Object());
-        c1.save(new Object());
-        c1.save(new Object());
-        c1.save(new Object());
-        c2.save(new Object());
-        c2.save(new Object());
-        Thread.sleep(2000);
-
-        Assert.assertEquals(5, result1.size());
-        Assert.assertEquals(2, result2.size());
-    }
-}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
index 4433b9c..b9dfd8f 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
@@ -80,7 +80,7 @@ public class ConsumerTest {
         for (SampleData data : result) {
             consumerCounter.add(data.getIntValue());
         }
-        Assert.assertEquals(5, consumerCounter.size());
+        Assert.assertEquals(2, consumerCounter.size());
     }
 
     @Test
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index e671ef4..5b86ae7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -113,7 +113,6 @@ public class GRPCRemoteClient implements RemoteClient {
             synchronized (GRPCRemoteClient.class) {
                 if (Objects.isNull(this.carrier)) {
                     this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
-                    this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
                 }
             }
         }