You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/14 10:30:12 UTC

[incubator-iotdb] 01/01: replace ConcurrentLinkedQueue with CircularArray

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch time_statstic_2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 72a250d871f1b5f4d72a16871524cccfb1f4a13d
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Fri Jun 14 18:29:52 2019 +0800

    replace ConcurrentLinkedQueue with CircularArray
---
 .../db/cost/statistic/ConcurrentCircularArray.java | 65 ++++++++++++++++++++++
 .../iotdb/db/cost/statistic/Measurement.java       | 54 +++++++++---------
 2 files changed, 91 insertions(+), 28 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/ConcurrentCircularArray.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/ConcurrentCircularArray.java
new file mode 100644
index 0000000..8b3bd55
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/ConcurrentCircularArray.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.cost.statistic;
+
+import java.util.Queue;
+
+public class ConcurrentCircularArray {
+  long[] data;
+  int tail;
+  int head;
+
+  public ConcurrentCircularArray(int size) {
+    this.data = new long[size];
+    tail = head = 0;
+  }
+
+  /**
+   *
+   * @param d the data
+   * @return true if successfully; false if there is no space.
+   */
+  public synchronized boolean put(long d) {
+    if ((tail + 1) % data.length == head) {
+      return false;
+    }
+    data[tail++] = d;
+    tail = tail % data.length;
+    return true;
+  }
+
+  public synchronized boolean hasData() {
+    return tail != head;
+  }
+
+  /**
+   *
+   * @return -1 if there is no data.(However, you should call hasData() frist to avoid returning -1)
+   */
+  public synchronized long take() {
+    if (tail != head) {
+      long result = data[head++];
+      head = head % data.length;
+      return result;
+    } else {
+      return -1;
+    }
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
index c261e82..687d7fa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
@@ -57,12 +57,12 @@ public class Measurement implements MeasurementMBean, IService {
   /**
    * queue for async store time latencies.
    */
-  private Queue<Long>[] operationLatenciesQueue;
+  private ConcurrentCircularArray[] operationLatenciesQueue;
 
   /**
    * size of each queue, this is calculated by memory.
    */
-  private final long QUEUE_SIZE;
+  private final int QUEUE_SIZE;
 
   /**
    * latencies sum of each operation.
@@ -122,11 +122,11 @@ public class Measurement implements MeasurementMBean, IService {
     int memory_in_kb = tdbConfig.getPerformance_stat_memory_in_kb();
 
     QUEUE_SIZE = memory_in_kb * 1000 / Operation.values().length / 8;
-    operationLatenciesQueue = new ConcurrentLinkedQueue[Operation.values().length];
+    operationLatenciesQueue = new ConcurrentCircularArray[Operation.values().length];
     operationLatencies = new long[Operation.values().length];
     operationCnt = new long[Operation.values().length];
     for (Operation op : Operation.values()) {
-      operationLatenciesQueue[op.ordinal()] = new ConcurrentLinkedQueue<>();
+      operationLatenciesQueue[op.ordinal()] = new ConcurrentCircularArray(QUEUE_SIZE);
       operationCnt[op.ordinal()] = 0;
       operationLatencies[op.ordinal()] = 0;
     }
@@ -142,10 +142,11 @@ public class Measurement implements MeasurementMBean, IService {
     futureList = new ArrayList<>();
   }
 
-  public void addOperationLatency(Operation op, long startTime) {
-    if (isEnableStat && operationLatenciesQueue[op.ordinal()].size() < QUEUE_SIZE) {
-      operationLatenciesQueue[op.ordinal()].add((System.currentTimeMillis() - startTime));
+  public boolean addOperationLatency(Operation op, long startTime) {
+    if (isEnableStat) {
+      return operationLatenciesQueue[op.ordinal()].put((System.currentTimeMillis() - startTime));
     }
+    return false;
   }
 
   @Override
@@ -160,7 +161,7 @@ public class Measurement implements MeasurementMBean, IService {
       Future future = service.scheduleWithFixedDelay(
           new Measurement.DisplayRunnable(), 20, displayIntervalInMs, TimeUnit.MILLISECONDS);
       futureList.add(future);
-      futureList.add(service.schedule(new QueueConsumerThread(), 10, TimeUnit.MILLISECONDS));
+      futureList.add(service.schedule(new QueueConsumerThread(), 0, TimeUnit.MILLISECONDS));
 
 
     } catch (Exception e) {
@@ -337,30 +338,27 @@ public class Measurement implements MeasurementMBean, IService {
     }
 
     private void consumer() {
-      int cnt = 0;
-      boolean allEmpty = false;
+      boolean allEmpty;
       while (true) {
-        cnt++;
-        if (cnt > 2 * QUEUE_SIZE || allEmpty) {
-          try {
-            Thread.sleep(1000);
-            cnt = 0;
-            allEmpty = false;
-            continue;
-          } catch (InterruptedException e) {
-            return;
-          }
-        }
         allEmpty = true;
         for (Operation op : Operation.values()) {
           int idx = op.ordinal();
-          Queue<Long> queue = operationLatenciesQueue[idx];
-          Long time = queue.poll();
-          if (time != null) {
-            operationLatencies[idx] += time;
-            operationCnt[idx]++;
-            operationHistogram[idx][calIndex(time)]++;
-            allEmpty = false;
+          ConcurrentCircularArray queue = operationLatenciesQueue[idx];
+          if (queue.hasData()) {
+            Long time = queue.take();
+            if (time != null) {
+              operationLatencies[idx] += time;
+              operationCnt[idx]++;
+              operationHistogram[idx][calIndex(time)]++;
+              allEmpty = false;
+            }
+          }
+        }
+        if (allEmpty) {
+          try {
+            Thread.sleep(10);
+          } catch (InterruptedException e) {
+            return;
           }
         }
       }