You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/14 10:30:12 UTC
[incubator-iotdb] 01/01: replace ConcurrentLinkedQueue with
CircularArray
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch time_statstic_2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 72a250d871f1b5f4d72a16871524cccfb1f4a13d
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Fri Jun 14 18:29:52 2019 +0800
replace ConcurrentLinkedQueue with CircularArray
---
.../db/cost/statistic/ConcurrentCircularArray.java | 65 ++++++++++++++++++++++
.../iotdb/db/cost/statistic/Measurement.java | 54 +++++++++---------
2 files changed, 91 insertions(+), 28 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/ConcurrentCircularArray.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/ConcurrentCircularArray.java
new file mode 100644
index 0000000..8b3bd55
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/ConcurrentCircularArray.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.cost.statistic;
+
+import java.util.Queue;
+
+public class ConcurrentCircularArray {
+ long[] data;
+ int tail;
+ int head;
+
+ public ConcurrentCircularArray(int size) {
+ this.data = new long[size];
+ tail = head = 0;
+ }
+
+ /**
+ *
+ * @param d the data
+ * @return true if successfully; false if there is no space.
+ */
+ public synchronized boolean put(long d) {
+ if ((tail + 1) % data.length == head) {
+ return false;
+ }
+ data[tail++] = d;
+ tail = tail % data.length;
+ return true;
+ }
+
+ public synchronized boolean hasData() {
+ return tail != head;
+ }
+
+ /**
+ *
+ * @return -1 if there is no data.(However, you should call hasData() frist to avoid returning -1)
+ */
+ public synchronized long take() {
+ if (tail != head) {
+ long result = data[head++];
+ head = head % data.length;
+ return result;
+ } else {
+ return -1;
+ }
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
index c261e82..687d7fa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/cost/statistic/Measurement.java
@@ -57,12 +57,12 @@ public class Measurement implements MeasurementMBean, IService {
/**
* queue for async store time latencies.
*/
- private Queue<Long>[] operationLatenciesQueue;
+ private ConcurrentCircularArray[] operationLatenciesQueue;
/**
* size of each queue, this is calculated by memory.
*/
- private final long QUEUE_SIZE;
+ private final int QUEUE_SIZE;
/**
* latencies sum of each operation.
@@ -122,11 +122,11 @@ public class Measurement implements MeasurementMBean, IService {
int memory_in_kb = tdbConfig.getPerformance_stat_memory_in_kb();
QUEUE_SIZE = memory_in_kb * 1000 / Operation.values().length / 8;
- operationLatenciesQueue = new ConcurrentLinkedQueue[Operation.values().length];
+ operationLatenciesQueue = new ConcurrentCircularArray[Operation.values().length];
operationLatencies = new long[Operation.values().length];
operationCnt = new long[Operation.values().length];
for (Operation op : Operation.values()) {
- operationLatenciesQueue[op.ordinal()] = new ConcurrentLinkedQueue<>();
+ operationLatenciesQueue[op.ordinal()] = new ConcurrentCircularArray(QUEUE_SIZE);
operationCnt[op.ordinal()] = 0;
operationLatencies[op.ordinal()] = 0;
}
@@ -142,10 +142,11 @@ public class Measurement implements MeasurementMBean, IService {
futureList = new ArrayList<>();
}
- public void addOperationLatency(Operation op, long startTime) {
- if (isEnableStat && operationLatenciesQueue[op.ordinal()].size() < QUEUE_SIZE) {
- operationLatenciesQueue[op.ordinal()].add((System.currentTimeMillis() - startTime));
+ public boolean addOperationLatency(Operation op, long startTime) {
+ if (isEnableStat) {
+ return operationLatenciesQueue[op.ordinal()].put((System.currentTimeMillis() - startTime));
}
+ return false;
}
@Override
@@ -160,7 +161,7 @@ public class Measurement implements MeasurementMBean, IService {
Future future = service.scheduleWithFixedDelay(
new Measurement.DisplayRunnable(), 20, displayIntervalInMs, TimeUnit.MILLISECONDS);
futureList.add(future);
- futureList.add(service.schedule(new QueueConsumerThread(), 10, TimeUnit.MILLISECONDS));
+ futureList.add(service.schedule(new QueueConsumerThread(), 0, TimeUnit.MILLISECONDS));
} catch (Exception e) {
@@ -337,30 +338,27 @@ public class Measurement implements MeasurementMBean, IService {
}
private void consumer() {
- int cnt = 0;
- boolean allEmpty = false;
+ boolean allEmpty;
while (true) {
- cnt++;
- if (cnt > 2 * QUEUE_SIZE || allEmpty) {
- try {
- Thread.sleep(1000);
- cnt = 0;
- allEmpty = false;
- continue;
- } catch (InterruptedException e) {
- return;
- }
- }
allEmpty = true;
for (Operation op : Operation.values()) {
int idx = op.ordinal();
- Queue<Long> queue = operationLatenciesQueue[idx];
- Long time = queue.poll();
- if (time != null) {
- operationLatencies[idx] += time;
- operationCnt[idx]++;
- operationHistogram[idx][calIndex(time)]++;
- allEmpty = false;
+ ConcurrentCircularArray queue = operationLatenciesQueue[idx];
+ if (queue.hasData()) {
+ Long time = queue.take();
+ if (time != null) {
+ operationLatencies[idx] += time;
+ operationCnt[idx]++;
+ operationHistogram[idx][calIndex(time)]++;
+ allEmpty = false;
+ }
+ }
+ }
+ if (allEmpty) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ return;
}
}
}