You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/11/13 09:56:03 UTC
[skywalking] 01/01: Refactor DataCarrier,
support ArrayBlockingQueueBuffer as the implementation for blocking
queue buffer.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch blocking-queue
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit b1a9bdb448551344aa97c4b0a94390e0d02bd305
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Nov 13 17:55:11 2019 +0800
Refactor DataCarrier, support ArrayBlockingQueueBuffer as the implementation for blocking queue buffer.
---
.../commons/datacarrier/BlockingDataCarrier.java | 4 -
.../buffer/ArrayBlockingQueueBuffer.java | 69 ++++++++++++++++
.../apm/commons/datacarrier/buffer/Buffer.java | 30 ++-----
.../commons/datacarrier/buffer/BufferStrategy.java | 1 -
.../apm/commons/datacarrier/buffer/Channels.java | 25 +++---
.../QueueBuffer.java} | 31 ++++++--
.../datacarrier/consumer/ConsumeDriver.java | 54 ++++---------
.../datacarrier/consumer/ConsumerThread.java | 29 ++-----
.../consumer/MultipleChannelsConsumer.java | 3 +-
.../apm/commons/datacarrier/DataCarrierTest.java | 46 ++++-------
.../datacarrier/consumer/BulkConsumePoolTest.java | 91 ----------------------
.../commons/datacarrier/consumer/ConsumerTest.java | 2 +-
.../core/remote/client/GRPCRemoteClient.java | 1 -
13 files changed, 152 insertions(+), 234 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
index 8b97080..399cc8e 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/BlockingDataCarrier.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.apm.commons.datacarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
/**
* @author wu-sheng
@@ -31,7 +30,4 @@ public class BlockingDataCarrier<T> {
this.channels = channels;
}
- public void addCallback(QueueBlockingCallback<T> callback) {
- this.channels.addCallback(callback);
- }
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
new file mode 100644
index 0000000..421a1db
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/ArrayBlockingQueueBuffer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.buffer;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * The buffer implementation based on JDK ArrayBlockingQueue.
+ *
+ * This implementation has better performance in server side. We are still trying to research whether this is suitable
+ * for agent side, which is more sensitive about blocks.
+ *
+ * @author wusheng
+ */
+public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
+ private BufferStrategy strategy;
+ private ArrayBlockingQueue<T> queue;
+ private int bufferSize;
+
+ ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
+ this.strategy = strategy;
+ this.queue = new ArrayBlockingQueue<T>(bufferSize);
+ this.bufferSize = bufferSize;
+ }
+
+ @Override public boolean save(T data) {
+ switch (strategy){
+ case IF_POSSIBLE:
+ return queue.offer(data);
+ default:
+ try {
+ queue.put(data);
+ } catch (InterruptedException e) {
+ // Ignore the error
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override public void setStrategy(BufferStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ @Override public void obtain(List<T> consumeList) {
+ queue.drainTo(consumeList);
+ }
+
+ @Override public int getBufferSize() {
+ return bufferSize;
+ }
+}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
index d869832..b4419a7 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
@@ -18,49 +18,36 @@
package org.apache.skywalking.apm.commons.datacarrier.buffer;
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
-import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
-
-import java.util.LinkedList;
import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
/**
- * Created by wusheng on 2016/10/25.
+ * Self implementation ring queue.
+ *
+ * @author wusheng
*/
-public class Buffer<T> {
+public class Buffer<T> implements QueueBuffer<T> {
private final Object[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
- private List<QueueBlockingCallback<T>> callbacks;
Buffer(int bufferSize, BufferStrategy strategy) {
buffer = new Object[bufferSize];
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
- callbacks = new LinkedList<QueueBlockingCallback<T>>();
}
- void setStrategy(BufferStrategy strategy) {
+ public void setStrategy(BufferStrategy strategy) {
this.strategy = strategy;
}
- void addCallback(QueueBlockingCallback<T> callback) {
- callbacks.add(callback);
- }
- boolean save(T data) {
+ public boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
- boolean isFirstTimeBlocking = true;
while (buffer[i] != null) {
- if (isFirstTimeBlocking) {
- isFirstTimeBlocking = false;
- for (QueueBlockingCallback<T> callback : callbacks) {
- callback.notify(data);
- }
- }
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
@@ -69,7 +56,6 @@ public class Buffer<T> {
break;
case IF_POSSIBLE:
return false;
- case OVERRIDE:
default:
}
}
@@ -85,7 +71,7 @@ public class Buffer<T> {
this.obtain(consumeList, 0, buffer.length);
}
- public void obtain(List<T> consumeList, int start, int end) {
+ void obtain(List<T> consumeList, int start, int end) {
for (int i = start; i < end; i++) {
if (buffer[i] != null) {
consumeList.add((T)buffer[i]);
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
index fdad2e8..a26a324 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferStrategy.java
@@ -24,6 +24,5 @@ package org.apache.skywalking.apm.commons.datacarrier.buffer;
*/
public enum BufferStrategy {
BLOCKING,
- OVERRIDE,
IF_POSSIBLE
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
index af8cefc..1f13cc2 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
@@ -18,15 +18,14 @@
package org.apache.skywalking.apm.commons.datacarrier.buffer;
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
/**
- * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when buffer
- * is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
+ * Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when
+ * buffer is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
*/
public class Channels<T> {
- private final Buffer<T>[] bufferChannels;
+ private final QueueBuffer<T>[] bufferChannels;
private IDataPartitioner<T> dataPartitioner;
private BufferStrategy strategy;
private final long size;
@@ -34,9 +33,13 @@ public class Channels<T> {
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
this.strategy = strategy;
- bufferChannels = new Buffer[channelSize];
+ bufferChannels = new QueueBuffer[channelSize];
for (int i = 0; i < channelSize; i++) {
- bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
+ if (BufferStrategy.BLOCKING.equals(strategy)) {
+ bufferChannels[i] = new ArrayBlockingQueueBuffer<T>(bufferSize, strategy);
+ } else {
+ bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
+ }
}
size = channelSize * bufferSize;
}
@@ -69,7 +72,7 @@ public class Channels<T> {
* @param strategy
*/
public void setStrategy(BufferStrategy strategy) {
- for (Buffer<T> buffer : bufferChannels) {
+ for (QueueBuffer<T> buffer : bufferChannels) {
buffer.setStrategy(strategy);
}
}
@@ -87,13 +90,7 @@ public class Channels<T> {
return size;
}
- public Buffer<T> getBuffer(int index) {
+ public QueueBuffer<T> getBuffer(int index) {
return this.bufferChannels[index];
}
-
- public void addCallback(QueueBlockingCallback<T> callback) {
- for (Buffer<T> channel : bufferChannels) {
- channel.addCallback(callback);
- }
- }
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
similarity index 58%
rename from apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java
rename to apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
index c0a6c47..5789919 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/callback/QueueBlockingCallback.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/QueueBuffer.java
@@ -16,13 +16,34 @@
*
*/
-package org.apache.skywalking.apm.commons.datacarrier.callback;
+package org.apache.skywalking.apm.commons.datacarrier.buffer;
+
+import java.util.List;
/**
- * Notify when the queue, which is in blocking strategy, has be blocked.
+ * Queue buffer interface.
*
- * @author wu-sheng
+ * @author wusheng
*/
-public interface QueueBlockingCallback<T> {
- void notify(T message);
+public interface QueueBuffer<T> {
+ /**
+ * Save data into the queue;
+ * @param data to add.
+ * @return true if saved
+ */
+ boolean save(T data);
+
+ /**
+ * Set different strategy when queue is full.
+ * @param strategy
+ */
+ void setStrategy(BufferStrategy strategy);
+
+ /**
+ * Obtain the existing data from the queue
+ * @param consumeList
+ */
+ void obtain(List<T> consumeList);
+
+ int getBufferSize();
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
index 3dcc26b..fcf06ed 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
@@ -18,9 +18,8 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
-import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
* Pool of consumers <p> Created by wusheng on 2016/10/25.
@@ -93,44 +92,19 @@ public class ConsumeDriver<T> implements IDriver {
private void allocateBuffer2Thread() {
int channelSize = this.channels.getChannelSize();
- if (channelSize < consumerThreads.length) {
- /**
- * if consumerThreads.length > channelSize
- * each channel will be process by several consumers.
- */
- ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
- for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
- int index = threadIndex % channelSize;
- if (threadAllocation[index] == null) {
- threadAllocation[index] = new ArrayList<Integer>();
- }
- threadAllocation[index].add(threadIndex);
- }
-
- for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
- ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
- Buffer<T> channel = this.channels.getBuffer(channelIndex);
- int bufferSize = channel.getBufferSize();
- int step = bufferSize / threadAllocationPerChannel.size();
- for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
- int threadIndex = threadAllocationPerChannel.get(i);
- int start = i * step;
- int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
- consumerThreads[threadIndex].addDataSource(channel, start, end);
- }
- }
- } else {
- /**
- * if consumerThreads.length < channelSize
- * each consumer will process several channels.
- *
- * if consumerThreads.length == channelSize
- * each consumer will process one channel.
- */
- for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
- int consumerIndex = channelIndex % consumerThreads.length;
- consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
- }
+ /**
+ * if consumerThreads.length < channelSize
+ * each consumer will process several channels.
+ *
+ * if consumerThreads.length == channelSize
+ * each consumer will process one channel.
+ *
+ * if consumerThreads.length > channelSize
+ * there will be some threads do nothing.
+ */
+ for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
+ int consumerIndex = channelIndex % consumerThreads.length;
+ consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
}
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
index 6a15c1c..15c01f5 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerThread.java
@@ -19,10 +19,10 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
-
import java.util.ArrayList;
import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
/**
* Created by wusheng on 2016/10/25.
@@ -42,23 +42,12 @@ public class ConsumerThread<T> extends Thread {
}
/**
- * add partition of buffer to consume
- *
- * @param sourceBuffer
- * @param start
- * @param end
- */
- void addDataSource(Buffer<T> sourceBuffer, int start, int end) {
- this.dataSources.add(new DataSource(sourceBuffer, start, end));
- }
-
- /**
* add whole buffer to consume
*
* @param sourceBuffer
*/
- void addDataSource(Buffer<T> sourceBuffer) {
- this.dataSources.add(new DataSource(sourceBuffer, 0, sourceBuffer.getBufferSize()));
+ void addDataSource(QueueBuffer<T> sourceBuffer) {
+ this.dataSources.add(new DataSource(sourceBuffer));
}
@Override
@@ -108,18 +97,14 @@ public class ConsumerThread<T> extends Thread {
* DataSource is a refer to {@link Buffer}.
*/
class DataSource {
- private Buffer<T> sourceBuffer;
- private int start;
- private int end;
+ private QueueBuffer<T> sourceBuffer;
- DataSource(Buffer<T> sourceBuffer, int start, int end) {
+ DataSource(QueueBuffer<T> sourceBuffer) {
this.sourceBuffer = sourceBuffer;
- this.start = start;
- this.end = end;
}
void obtain(List<T> consumeList) {
- sourceBuffer.obtain(consumeList, start, end);
+ sourceBuffer.obtain(consumeList);
}
}
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index 4963788..cca11c5 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import java.util.ArrayList;
import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
/**
* MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
@@ -73,7 +74,7 @@ public class MultipleChannelsConsumer extends Thread {
private boolean consume(Group target, List consumeList) {
for (int i = 0; i < target.channels.getChannelSize(); i++) {
- Buffer buffer = target.channels.getBuffer(i);
+ QueueBuffer buffer = target.channels.getBuffer(i);
buffer.obtain(consumeList);
}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
index 845b726..a3d9120 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrierTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
import org.apache.skywalking.apm.commons.datacarrier.partition.ProducerThreadPartitioner;
import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
import org.junit.Assert;
@@ -42,14 +43,14 @@ public class DataCarrierTest {
Assert.assertEquals(((Integer)(MemberModifier.field(DataCarrier.class, "channelSize").get(carrier))).intValue(), 5);
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
- Assert.assertEquals(channels.getChannelSize(), 5);
+ Assert.assertEquals(5, channels.getChannelSize());
- Buffer<SampleData> buffer = channels.getBuffer(0);
- Assert.assertEquals(buffer.getBufferSize(), 100);
+ QueueBuffer<SampleData> buffer = channels.getBuffer(0);
+ Assert.assertEquals(100, buffer.getBufferSize());
- Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.BLOCKING);
+ Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
- Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
+ Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(Channels.class, "dataPartitioner").get(channels).getClass(), SimpleRollingPartitioner.class);
carrier.setPartitioner(new ProducerThreadPartitioner<SampleData>());
@@ -65,39 +66,20 @@ public class DataCarrierTest {
Assert.assertTrue(carrier.produce(new SampleData().setName("d")));
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
- Buffer<SampleData> buffer1 = channels.getBuffer(0);
+ QueueBuffer<SampleData> buffer1 = channels.getBuffer(0);
List result = new ArrayList();
- buffer1.obtain(result, 0, 100);
+ buffer1.obtain(result);
Assert.assertEquals(2, result.size());
- Buffer<SampleData> buffer2 = channels.getBuffer(1);
- buffer2.obtain(result, 0, 100);
+ QueueBuffer<SampleData> buffer2 = channels.getBuffer(1);
+ buffer2.obtain(result);
Assert.assertEquals(4, result.size());
}
@Test
- public void testOverrideProduce() throws IllegalAccessException {
- DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
- carrier.setBufferStrategy(BufferStrategy.OVERRIDE);
-
- for (int i = 0; i < 500; i++) {
- Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
- }
-
- Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
- Buffer<SampleData> buffer1 = channels.getBuffer(0);
- List result = new ArrayList();
- buffer1.obtain(result, 0, 100);
-
- Buffer<SampleData> buffer2 = channels.getBuffer(1);
- buffer2.obtain(result, 0, 100);
- Assert.assertEquals(200, result.size());
- }
-
- @Test
public void testIfPossibleProduce() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
@@ -111,12 +93,12 @@ public class DataCarrierTest {
}
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
- Buffer<SampleData> buffer1 = channels.getBuffer(0);
+ QueueBuffer<SampleData> buffer1 = channels.getBuffer(0);
List result = new ArrayList();
- buffer1.obtain(result, 0, 100);
+ buffer1.obtain(result);
- Buffer<SampleData> buffer2 = channels.getBuffer(1);
- buffer2.obtain(result, 0, 100);
+ QueueBuffer<SampleData> buffer2 = channels.getBuffer(1);
+ buffer2.obtain(result);
Assert.assertEquals(200, result.size());
}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
deleted file mode 100644
index 9cd9c8c..0000000
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePoolTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.commons.datacarrier.consumer;
-
-import java.util.*;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
-import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
-import org.junit.*;
-
-/**
- * @author wusheng
- */
-public class BulkConsumePoolTest {
- @Test
- public void testOneThreadPool() throws InterruptedException {
- BulkConsumePool pool = new BulkConsumePool("testPool", 1, 50);
- final ArrayList<Object> result1 = new ArrayList();
- Channels c1 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
- pool.add("test", c1,
- new IConsumer() {
- @Override public void init() {
-
- }
-
- @Override public void consume(List data) {
- for (Object datum : data) {
- result1.add(datum);
- }
- }
-
- @Override public void onError(List data, Throwable t) {
-
- }
-
- @Override public void onExit() {
-
- }
- });
- pool.begin(c1);
- final ArrayList<Object> result2 = new ArrayList();
- Channels c2 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
- pool.add("test2", c2,
- new IConsumer() {
- @Override public void init() {
-
- }
-
- @Override public void consume(List data) {
- for (Object datum : data) {
- result2.add(datum);
- }
- }
-
- @Override public void onError(List data, Throwable t) {
-
- }
-
- @Override public void onExit() {
-
- }
- });
- pool.begin(c2);
- c1.save(new Object());
- c1.save(new Object());
- c1.save(new Object());
- c1.save(new Object());
- c1.save(new Object());
- c2.save(new Object());
- c2.save(new Object());
- Thread.sleep(2000);
-
- Assert.assertEquals(5, result1.size());
- Assert.assertEquals(2, result2.size());
- }
-}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
index 4433b9c..b9dfd8f 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
@@ -80,7 +80,7 @@ public class ConsumerTest {
for (SampleData data : result) {
consumerCounter.add(data.getIntValue());
}
- Assert.assertEquals(5, consumerCounter.size());
+ Assert.assertEquals(2, consumerCounter.size());
}
@Test
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index e671ef4..5b86ae7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -113,7 +113,6 @@ public class GRPCRemoteClient implements RemoteClient {
synchronized (GRPCRemoteClient.class) {
if (Objects.isNull(this.carrier)) {
this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
- this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
}
}
}