You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/11/13 09:56:02 UTC

[skywalking] branch blocking-queue created (now b1a9bdb)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a change to branch blocking-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at b1a9bdb  Refactor DataCarrier, support ArrayBlockingQueueBuffer as the implementation for blocking queue buffer.

This branch includes the following new commits:

     new b1a9bdb  Refactor DataCarrier, support ArrayBlockingQueueBuffer as the implementation for blocking queue buffer.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking] 01/01: Refactor DataCarrier, support ArrayBlockingQueueBuffer as the implementation for blocking queue buffer.

Posted by wu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch blocking-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit b1a9bdb448551344aa97c4b0a94390e0d02bd305
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Nov 13 17:55:11 2019 +0800

    Refactor DataCarrier, support ArrayBlockingQueueBuffer as the implementation for blocking queue buffer.
---
 .../commons/datacarrier/BlockingDataCarrier.java   |  4 -
 .../buffer/ArrayBlockingQueueBuffer.java           | 69 ++++++++++++++++
 .../apm/commons/datacarrier/buffer/Buffer.java     | 30 ++-----
 .../commons/datacarrier/buffer/BufferStrategy.java |  1 -
 .../apm/commons/datacarrier/buffer/Channels.java   | 25 +++---
 .../QueueBuffer.java}                              | 31 ++++++--
 .../datacarrier/consumer/ConsumeDriver.java        | 54 ++++---------
 .../datacarrier/consumer/ConsumerThread.java       | 29 ++-----
 .../consumer/MultipleChannelsConsumer.java         |  3 +-
 .../apm/commons/datacarrier/DataCarrierTest.java   | 46 ++++-------
 .../datacarrier/consumer/BulkConsumePoolTest.java  | 91 ----------------------
 .../commons/datacarrier/consumer/ConsumerTest.java |  2 +-
 .../core/remote/client/GRPCRemoteClient.java       |  1 -
 13 files changed, 152 insertions(+), 234 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
index 8b97080..399cc8e 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.apm.commons.datacarrier;
 
 import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
 
 /**
  * @author wu-sheng
@@ -31,7 +30,4 @@ public class BlockingDataCarrier<T> {
         this.channels = channels;
     }
 
-    public void addCallback(QueueBlockingCallback<T> callback) {
-        this.channels.addCallback(callback);
-    }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
new file mode 100644
index 0000000..421a1db
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.buffer;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * The buffer implementation based on JDK ArrayBlockingQueue.
+ *
+ * This implementation has better performance in server side. We are still trying to research whether this is suitable
+ * for agent side, which is more sensitive about blocks.
+ *
+ * @author wusheng
+ */
+public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
+    private BufferStrategy strategy;
+    private ArrayBlockingQueue<T> queue;
+    private int bufferSize;
+
+    ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
+        this.strategy = strategy;
+        this.queue = new ArrayBlockingQueue<T>(bufferSize);
+        this.bufferSize = bufferSize;
+    }
+
+    @Override public boolean save(T data) {
+        switch (strategy){
+            case IF_POSSIBLE:
+                return queue.offer(data);
+            default:
+                try {
+                    queue.put(data);
+                } catch (InterruptedException e) {
+                    // Ignore the error
+                    return false;
+                }
+        }
+        return true;
+    }
+
+    @Override public void setStrategy(BufferStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override public void obtain(List<T> consumeList) {
+        queue.drainTo(consumeList);
+    }
+
+    @Override public int getBufferSize() {
+        return bufferSize;
+    }
+}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
index d869832..b4419a7 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
@@ -18,49 +18,36 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.buffer;
 
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
-import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
-
-import java.util.LinkedList;
 import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
 
 /**
- * Created by wusheng on 2016/10/25.
+ * Self implementation ring queue.
+ *
+ * @author wusheng
  */
-public class Buffer<T> {
+public class Buffer<T> implements QueueBuffer<T> {
     private final Object[] buffer;
     private BufferStrategy strategy;
     private AtomicRangeInteger index;
-    private List<QueueBlockingCallback<T>> callbacks;
 
     Buffer(int bufferSize, BufferStrategy strategy) {
         buffer = new Object[bufferSize];
         this.strategy = strategy;
         index = new AtomicRangeInteger(0, bufferSize);
-        callbacks = new LinkedList<QueueBlockingCallback<T>>();
     }
 
-    void setStrategy(BufferStrategy strategy) {
+    public void setStrategy(BufferStrategy strategy) {
         this.strategy = strategy;
     }
 
-    void addCallback(QueueBlockingCallback<T> callback) {
-        callbacks.add(callback);
-    }
 
-    boolean save(T data) {
+    public boolean save(T data) {
         int i = index.getAndIncrement();
         if (buffer[i] != null) {
             switch (strategy) {
                 case BLOCKING:
-                    boolean isFirstTimeBlocking = true;
                     while (buffer[i] != null) {
-                        if (isFirstTimeBlocking) {
-                            isFirstTimeBlocking = false;
-                            for (QueueBlockingCallback<T> callback : callbacks) {
-                                callback.notify(data);
-                            }
-                        }
                         try {
                             Thread.sleep(1L);
                         } catch (InterruptedException e) {
@@ -69,7 +56,6 @@ public class Buffer<T> {
                     break;
                 case IF_POSSIBLE:
                     return false;
-                case OVERRIDE:
                 default:
             }
         }
@@ -85,7 +71,7 @@ public class Buffer<T> {
         this.obtain(consumeList, 0, buffer.length);
     }
 
-    public void obtain(List<T> consumeList, int start, int end) {
+    void obtain(List<T> consumeList, int start, int end) {
         for (int i = start; i < end; i++) {
             if (buffer[i] != null) {
                 consumeList.add((T)buffer[i]);
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
index fdad2e8..a26a324 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
@@ -24,6 +24,5 @@ package org.apache.skywalking.apm.commons.datacarrier.buffer;
  */
 public enum BufferStrategy {
     BLOCKING,
-    OVERRIDE,
     IF_POSSIBLE
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
index af8cefc..1f13cc2 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
@@ -18,15 +18,14 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.buffer;
 
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
 import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
 
 /**
- * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when buffer
- * is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
+ * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when
+ * buffer is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
  */
 public class Channels<T> {
-    private final Buffer<T>[] bufferChannels;
+    private final QueueBuffer<T>[] bufferChannels;
     private IDataPartitioner<T> dataPartitioner;
     private BufferStrategy strategy;
     private final long size;
@@ -34,9 +33,13 @@ public class Channels<T> {
     public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
         this.dataPartitioner = partitioner;
         this.strategy = strategy;
-        bufferChannels = new Buffer[channelSize];
+        bufferChannels = new QueueBuffer[channelSize];
         for (int i = 0; i < channelSize; i++) {
-            bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
+            if (BufferStrategy.BLOCKING.equals(strategy)) {
+                bufferChannels[i] = new ArrayBlockingQueueBuffer<T>(bufferSize, strategy);
+            } else {
+                bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
+            }
         }
         size = channelSize * bufferSize;
     }
@@ -69,7 +72,7 @@ public class Channels<T> {
      * @param strategy
      */
     public void setStrategy(BufferStrategy strategy) {
-        for (Buffer<T> buffer : bufferChannels) {
+        for (QueueBuffer<T> buffer : bufferChannels) {
             buffer.setStrategy(strategy);
         }
     }
@@ -87,13 +90,7 @@ public class Channels<T> {
         return size;
     }
 
-    public Buffer<T> getBuffer(int index) {
+    public QueueBuffer<T> getBuffer(int index) {
         return this.bufferChannels[index];
     }
-
-    public void addCallback(QueueBlockingCallback<T> callback) {
-        for (Buffer<T> channel : bufferChannels) {
-            channel.addCallback(callback);
-        }
-    }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
similarity index 58%
rename from apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java
rename to apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
index c0a6c47..5789919 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
@@ -16,13 +16,34 @@
  *
  */
 
-package org.apache.skywalking.apm.commons.datacarrier.callback;
+package org.apache.skywalking.apm.commons.datacarrier.buffer;
+
+import java.util.List;
 
 /**
- * Notify when the queue, which is in blocking strategy, has be blocked.
+ * Queue buffer interface.
  *
- * @author wu-sheng
+ * @author wusheng
  */
-public interface QueueBlockingCallback<T> {
-    void notify(T message);
+public interface QueueBuffer<T> {
+    /**
+     * Save data into the queue;
+     * @param data to add.
+     * @return true if saved
+     */
+    boolean save(T data);
+
+    /**
+     * Set different strategy when queue is full.
+     * @param strategy
+     */
+    void setStrategy(BufferStrategy strategy);
+
+    /**
+     * Obtain the existing data from the queue
+     * @param consumeList
+     */
+    void obtain(List<T> consumeList);
+
+    int getBufferSize();
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
index 3dcc26b..fcf06ed 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
@@ -18,9 +18,8 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.consumer;
 
-import java.util.ArrayList;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
 
 /**
  * Pool of consumers <p> Created by wusheng on 2016/10/25.
@@ -93,44 +92,19 @@ public class ConsumeDriver<T> implements IDriver {
 
     private void allocateBuffer2Thread() {
         int channelSize = this.channels.getChannelSize();
-        if (channelSize < consumerThreads.length) {
-            /**
-             * if consumerThreads.length > channelSize
-             * each channel will be process by several consumers.
-             */
-            ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
-            for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
-                int index = threadIndex % channelSize;
-                if (threadAllocation[index] == null) {
-                    threadAllocation[index] = new ArrayList<Integer>();
-                }
-                threadAllocation[index].add(threadIndex);
-            }
-
-            for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
-                ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
-                Buffer<T> channel = this.channels.getBuffer(channelIndex);
-                int bufferSize = channel.getBufferSize();
-                int step = bufferSize / threadAllocationPerChannel.size();
-                for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
-                    int threadIndex = threadAllocationPerChannel.get(i);
-                    int start = i * step;
-                    int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
-                    consumerThreads[threadIndex].addDataSource(channel, start, end);
-                }
-            }
-        } else {
-            /**
-             * if consumerThreads.length < channelSize
-             * each consumer will process several channels.
-             *
-             * if consumerThreads.length == channelSize
-             * each consumer will process one channel.
-             */
-            for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
-                int consumerIndex = channelIndex % consumerThreads.length;
-                consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
-            }
+        /**
+         * if consumerThreads.length < channelSize
+         * each consumer will process several channels.
+         *
+         * if consumerThreads.length == channelSize
+         * each consumer will process one channel.
+         *
+         * if consumerThreads.length > channelSize
+         * there will be some threads do nothing.
+         */
+        for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
+            int consumerIndex = channelIndex % consumerThreads.length;
+            consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
         }
 
     }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
index 6a15c1c..15c01f5 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
@@ -19,10 +19,10 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.consumer;
 
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
-
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
 
 /**
  * Created by wusheng on 2016/10/25.
@@ -42,23 +42,12 @@ public class ConsumerThread<T> extends Thread {
     }
 
     /**
-     * add partition of buffer to consume
-     *
-     * @param sourceBuffer
-     * @param start
-     * @param end
-     */
-    void addDataSource(Buffer<T> sourceBuffer, int start, int end) {
-        this.dataSources.add(new DataSource(sourceBuffer, start, end));
-    }
-
-    /**
      * add whole buffer to consume
      *
      * @param sourceBuffer
      */
-    void addDataSource(Buffer<T> sourceBuffer) {
-        this.dataSources.add(new DataSource(sourceBuffer, 0, sourceBuffer.getBufferSize()));
+    void addDataSource(QueueBuffer<T> sourceBuffer) {
+        this.dataSources.add(new DataSource(sourceBuffer));
     }
 
     @Override
@@ -108,18 +97,14 @@ public class ConsumerThread<T> extends Thread {
      * DataSource is a refer to {@link Buffer}.
      */
     class DataSource {
-        private Buffer<T> sourceBuffer;
-        private int start;
-        private int end;
+        private QueueBuffer<T> sourceBuffer;
 
-        DataSource(Buffer<T> sourceBuffer, int start, int end) {
+        DataSource(QueueBuffer<T> sourceBuffer) {
             this.sourceBuffer = sourceBuffer;
-            this.start = start;
-            this.end = end;
         }
 
         void obtain(List<T> consumeList) {
-            sourceBuffer.obtain(consumeList, start, end);
+            sourceBuffer.obtain(consumeList);
         }
     }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index 4963788..cca11c5 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
 
 /**
  * MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
@@ -73,7 +74,7 @@ public class MultipleChannelsConsumer extends Thread {
 
     private boolean consume(Group target, List consumeList) {
         for (int i = 0; i < target.channels.getChannelSize(); i++) {
-            Buffer buffer = target.channels.getBuffer(i);
+            QueueBuffer buffer = target.channels.getBuffer(i);
             buffer.obtain(consumeList);
         }
 
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
index 845b726..a3d9120 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
 import org.apache.skywalking.apm.commons.datacarrier.partition.ProducerThreadPartitioner;
 import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
 import org.junit.Assert;
@@ -42,14 +43,14 @@ public class DataCarrierTest {
         Assert.assertEquals(((Integer)(MemberModifier.field(DataCarrier.class, "channelSize").get(carrier))).intValue(), 5);
 
         Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
-        Assert.assertEquals(channels.getChannelSize(), 5);
+        Assert.assertEquals(5, channels.getChannelSize());
 
-        Buffer<SampleData> buffer = channels.getBuffer(0);
-        Assert.assertEquals(buffer.getBufferSize(), 100);
+        QueueBuffer<SampleData> buffer = channels.getBuffer(0);
+        Assert.assertEquals(100, buffer.getBufferSize());
 
-        Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.BLOCKING);
+        Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING);
         carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
-        Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
+        Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
 
         Assert.assertEquals(MemberModifier.field(Channels.class, "dataPartitioner").get(channels).getClass(), SimpleRollingPartitioner.class);
         carrier.setPartitioner(new ProducerThreadPartitioner<SampleData>());
@@ -65,39 +66,20 @@ public class DataCarrierTest {
         Assert.assertTrue(carrier.produce(new SampleData().setName("d")));
 
         Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
-        Buffer<SampleData> buffer1 = channels.getBuffer(0);
+        QueueBuffer<SampleData> buffer1 = channels.getBuffer(0);
 
         List result = new ArrayList();
-        buffer1.obtain(result, 0, 100);
+        buffer1.obtain(result);
         Assert.assertEquals(2, result.size());
 
-        Buffer<SampleData> buffer2 = channels.getBuffer(1);
-        buffer2.obtain(result, 0, 100);
+        QueueBuffer<SampleData> buffer2 = channels.getBuffer(1);
+        buffer2.obtain(result);
 
         Assert.assertEquals(4, result.size());
 
     }
 
     @Test
-    public void testOverrideProduce() throws IllegalAccessException {
-        DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
-        carrier.setBufferStrategy(BufferStrategy.OVERRIDE);
-
-        for (int i = 0; i < 500; i++) {
-            Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
-        }
-
-        Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
-        Buffer<SampleData> buffer1 = channels.getBuffer(0);
-        List result = new ArrayList();
-        buffer1.obtain(result, 0, 100);
-
-        Buffer<SampleData> buffer2 = channels.getBuffer(1);
-        buffer2.obtain(result, 0, 100);
-        Assert.assertEquals(200, result.size());
-    }
-
-    @Test
     public void testIfPossibleProduce() throws IllegalAccessException {
         DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
         carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
@@ -111,12 +93,12 @@ public class DataCarrierTest {
         }
 
         Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
-        Buffer<SampleData> buffer1 = channels.getBuffer(0);
+        QueueBuffer<SampleData> buffer1 = channels.getBuffer(0);
         List result = new ArrayList();
-        buffer1.obtain(result, 0, 100);
+        buffer1.obtain(result);
 
-        Buffer<SampleData> buffer2 = channels.getBuffer(1);
-        buffer2.obtain(result, 0, 100);
+        QueueBuffer<SampleData> buffer2 = channels.getBuffer(1);
+        buffer2.obtain(result);
         Assert.assertEquals(200, result.size());
     }
 
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
deleted file mode 100644
index 9cd9c8c..0000000
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.commons.datacarrier.consumer;
-
-import java.util.*;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
-import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
-import org.junit.*;
-
-/**
- * @author wusheng
- */
-public class BulkConsumePoolTest {
-    @Test
-    public void testOneThreadPool() throws InterruptedException {
-        BulkConsumePool pool = new BulkConsumePool("testPool", 1, 50);
-        final ArrayList<Object> result1 = new ArrayList();
-        Channels c1 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
-        pool.add("test", c1,
-            new IConsumer() {
-                @Override public void init() {
-
-                }
-
-                @Override public void consume(List data) {
-                    for (Object datum : data) {
-                        result1.add(datum);
-                    }
-                }
-
-                @Override public void onError(List data, Throwable t) {
-
-                }
-
-                @Override public void onExit() {
-
-                }
-            });
-        pool.begin(c1);
-        final ArrayList<Object> result2 = new ArrayList();
-        Channels c2 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
-        pool.add("test2", c2,
-            new IConsumer() {
-                @Override public void init() {
-
-                }
-
-                @Override public void consume(List data) {
-                    for (Object datum : data) {
-                        result2.add(datum);
-                    }
-                }
-
-                @Override public void onError(List data, Throwable t) {
-
-                }
-
-                @Override public void onExit() {
-
-                }
-            });
-        pool.begin(c2);
-        c1.save(new Object());
-        c1.save(new Object());
-        c1.save(new Object());
-        c1.save(new Object());
-        c1.save(new Object());
-        c2.save(new Object());
-        c2.save(new Object());
-        Thread.sleep(2000);
-
-        Assert.assertEquals(5, result1.size());
-        Assert.assertEquals(2, result2.size());
-    }
-}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
index 4433b9c..b9dfd8f 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
@@ -80,7 +80,7 @@ public class ConsumerTest {
         for (SampleData data : result) {
             consumerCounter.add(data.getIntValue());
         }
-        Assert.assertEquals(5, consumerCounter.size());
+        Assert.assertEquals(2, consumerCounter.size());
     }
 
     @Test
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index e671ef4..5b86ae7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -113,7 +113,6 @@ public class GRPCRemoteClient implements RemoteClient {
             synchronized (GRPCRemoteClient.class) {
                 if (Objects.isNull(this.carrier)) {
                     this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
-                    this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
                 }
             }
         }