You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/10/28 07:09:55 UTC

[skywalking] 01/01: Improve visibility for DataCarrier.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch queue-visibility
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit d239b85f369d463d72c50743788758c9130aec62
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Oct 28 15:09:35 2019 +0800

    Improve visibility for DataCarrier.
---
 .../apm/commons/datacarrier/buffer/Buffer.java     | 26 ++++++-----
 .../apm/commons/datacarrier/buffer/BufferItem.java | 52 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 11 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
index d869832..bd325a7 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Buffer.java
@@ -18,23 +18,25 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.buffer;
 
-import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
-import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
-
 import java.util.LinkedList;
 import java.util.List;
+import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
+import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
 
 /**
  * Created by wusheng on 2016/10/25.
  */
 public class Buffer<T> {
-    private final Object[] buffer;
+    private final BufferItem[] buffer;
     private BufferStrategy strategy;
     private AtomicRangeInteger index;
     private List<QueueBlockingCallback<T>> callbacks;
 
     Buffer(int bufferSize, BufferStrategy strategy) {
-        buffer = new Object[bufferSize];
+        buffer = new BufferItem[bufferSize];
+        for (int i = 0; i < bufferSize; i++) {
+            buffer[i] = new BufferItem();
+        }
         this.strategy = strategy;
         index = new AtomicRangeInteger(0, bufferSize);
         callbacks = new LinkedList<QueueBlockingCallback<T>>();
@@ -50,11 +52,12 @@ public class Buffer<T> {
 
     boolean save(T data) {
         int i = index.getAndIncrement();
-        if (buffer[i] != null) {
+        BufferItem bufferItem = buffer[i];
+        if (bufferItem.hasData()) {
             switch (strategy) {
                 case BLOCKING:
                     boolean isFirstTimeBlocking = true;
-                    while (buffer[i] != null) {
+                    while (bufferItem.hasData()) {
                         if (isFirstTimeBlocking) {
                             isFirstTimeBlocking = false;
                             for (QueueBlockingCallback<T> callback : callbacks) {
@@ -73,7 +76,7 @@ public class Buffer<T> {
                 default:
             }
         }
-        buffer[i] = data;
+        bufferItem.setItem(data);
         return true;
     }
 
@@ -87,9 +90,10 @@ public class Buffer<T> {
 
     public void obtain(List<T> consumeList, int start, int end) {
         for (int i = start; i < end; i++) {
-            if (buffer[i] != null) {
-                consumeList.add((T)buffer[i]);
-                buffer[i] = null;
+            Object dataItem = buffer[i].getItem();
+            if (dataItem != null) {
+                consumeList.add((T)dataItem);
+                buffer[i].clear();
             }
         }
     }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferItem.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferItem.java
new file mode 100644
index 0000000..b5ee57d
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/BufferItem.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.buffer;
+
+/**
+ * Buffer item fills the whole the {@code org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer}, and never be
+ * cleared from it. At the same time, it provide `volatile` field to hold the real data.
+ *
+ * @author wusheng
+ */
+public class BufferItem {
+    /**
+     * Hold the ref for the real data. `null` means no data.
+     */
+    private volatile Object item = null;
+
+    public Object getItem() {
+        return item;
+    }
+
+    public void setItem(Object item) {
+        this.item = item;
+    }
+
+    public boolean isEmpty() {
+        return this.item == null;
+    }
+
+    public boolean hasData() {
+        return !isEmpty();
+    }
+
+    public void clear() {
+        this.item = null;
+    }
+}