You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/10/28 07:09:55 UTC
[skywalking] 01/01: Improve visibility for DataCarrier.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch queue-visibility
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit d239b85f369d463d72c50743788758c9130aec62
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Oct 28 15:09:35 2019 +0800
Improve visibility for DataCarrier.
---
.../apm/commons/datacarrier/buffer/Buffer.java | 26 ++++++-----
.../apm/commons/datacarrier/buffer/BufferItem.java | 52 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 11 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
index d869832..bd325a7 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
@@ -18,23 +18,25 @@
package org.apache.skywalking.apm.commons.datacarrier.buffer;
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
-import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
-
import java.util.LinkedList;
import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
+import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
/**
* Created by wusheng on 2016/10/25.
*/
public class Buffer<T> {
- private final Object[] buffer;
+ private final BufferItem[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
private List<QueueBlockingCallback<T>> callbacks;
Buffer(int bufferSize, BufferStrategy strategy) {
- buffer = new Object[bufferSize];
+ buffer = new BufferItem[bufferSize];
+ for (int i = 0; i < bufferSize; i++) {
+ buffer[i] = new BufferItem();
+ }
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
callbacks = new LinkedList<QueueBlockingCallback<T>>();
@@ -50,11 +52,12 @@ public class Buffer<T> {
boolean save(T data) {
int i = index.getAndIncrement();
- if (buffer[i] != null) {
+ BufferItem bufferItem = buffer[i];
+ if (bufferItem.hasData()) {
switch (strategy) {
case BLOCKING:
boolean isFirstTimeBlocking = true;
- while (buffer[i] != null) {
+ while (bufferItem.hasData()) {
if (isFirstTimeBlocking) {
isFirstTimeBlocking = false;
for (QueueBlockingCallback<T> callback : callbacks) {
@@ -73,7 +76,7 @@ public class Buffer<T> {
default:
}
}
- buffer[i] = data;
+ bufferItem.setItem(data);
return true;
}
@@ -87,9 +90,10 @@ public class Buffer<T> {
public void obtain(List<T> consumeList, int start, int end) {
for (int i = start; i < end; i++) {
- if (buffer[i] != null) {
- consumeList.add((T)buffer[i]);
- buffer[i] = null;
+ Object dataItem = buffer[i].getItem();
+ if (dataItem != null) {
+ consumeList.add((T)dataItem);
+ buffer[i].clear();
}
}
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferItem.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferItem.java
new file mode 100644
index 0000000..b5ee57d
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferItem.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.buffer;
+
+/**
+ * Buffer item fills the whole the {@code org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer}, and never be
+ * cleared from it. At the same time, it provide `volatile` field to hold the real data.
+ *
+ * @author wusheng
+ */
+public class BufferItem {
+ /**
+ * Hold the ref for the real data. `null` means no data.
+ */
+ private volatile Object item = null;
+
+ public Object getItem() {
+ return item;
+ }
+
+ public void setItem(Object item) {
+ this.item = item;
+ }
+
+ public boolean isEmpty() {
+ return this.item == null;
+ }
+
+ public boolean hasData() {
+ return !isEmpty();
+ }
+
+ public void clear() {
+ this.item = null;
+ }
+}