You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/01/14 08:41:23 UTC
[incubator-iotdb] 01/04: s
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit be4b8af0d06f2cd58c162b3f147e9f9f0237309a
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 14 15:56:03 2020 +0800
s
---
.../main/java/org/apache/iotdb/JDBCExample.java | 47 +----
.../db/query/dataset/NonAlignEngineDataSet.java | 226 ++++++++++++---------
2 files changed, 137 insertions(+), 136 deletions(-)
diff --git a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
index 99a6193..cffdd32 100644
--- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
+++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
@@ -1,21 +1,3 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
package org.apache.iotdb;
import java.sql.Connection;
@@ -31,23 +13,11 @@ public class JDBCExample {
Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- statement.execute("SET STORAGE GROUP TO root.sg1");
- statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE");
- statement.execute("CREATE TIMESERIES root.sg1.d1.s2 WITH DATATYPE=INT64, ENCODING=RLE");
- statement.execute("CREATE TIMESERIES root.sg1.d1.s3 WITH DATATYPE=INT64, ENCODING=RLE");
-
- for (int i = 0; i <= 100; i++) {
- statement.addBatch("insert into root.sg1.d1(timestamp, s1, s2, s3) values("+ i + "," + 1 + "," + 1 + "," + 1 + ")");
- }
- statement.executeBatch();
- statement.clearBatch();
-
- ResultSet resultSet = statement.executeQuery("select * from root where time <= 10");
- outputResult(resultSet);
- resultSet = statement.executeQuery("select count(*) from root");
- outputResult(resultSet);
- resultSet = statement.executeQuery("select count(*) from root where time >= 1 and time <= 100 group by ([0, 100], 20ms, 20ms)");
+ long startTime = System.currentTimeMillis();
+ ResultSet resultSet = statement.executeQuery("select * from root where time < 30000000");
outputResult(resultSet);
+ long endTime = System.currentTimeMillis();
+ System.out.println("Cost Time: " + (endTime - startTime));
}
}
@@ -61,15 +31,6 @@ public class JDBCExample {
}
System.out.println();
while (resultSet.next()) {
- for (int i = 1; ; i++) {
- System.out.print(resultSet.getString(i));
- if (i < columnCount) {
- System.out.print(", ");
- } else {
- System.out.println();
- break;
- }
- }
}
System.out.println("--------------------------\n");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index 1474bba..c2e72e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@ -19,13 +19,6 @@
package org.apache.iotdb.db.query.dataset;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
@@ -42,28 +35,33 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
public class NonAlignEngineDataSet extends QueryDataSet {
-
+
private static class ReadTask implements Runnable {
-
+
private final ManagedSeriesReader reader;
- private BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue;
+ private BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue;
private WatermarkEncoder encoder;
- private int rowOffset;
- private int fetchSize;
- private int rowLimit;
- private int alreadyReturnedRowNum = 0;
-
- public ReadTask(ManagedSeriesReader reader,
- BlockingQueue<Pair<PublicBAOS, PublicBAOS>> blockingQueue,
- WatermarkEncoder encoder,
- int rowOffset, int rowLimit, int fetchSize) {
+ NonAlignEngineDataSet dataSet;
+ private int index;
+
+
+ public ReadTask(ManagedSeriesReader reader,
+ BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue,
+ WatermarkEncoder encoder, NonAlignEngineDataSet dataSet, int index) {
this.reader = reader;
this.blockingQueue = blockingQueue;
this.encoder = encoder;
- this.rowOffset = rowOffset;
- this.rowLimit = rowLimit;
- this.fetchSize = fetchSize;
+ this.dataSet = dataSet;
+ this.index = index;
}
@Override
@@ -75,18 +73,23 @@ public class NonAlignEngineDataSet extends QueryDataSet {
// if the task is submitted, there must be free space in the queue
// so here we don't need to check whether the queue has free space
// the reader has next batch
- if (reader.hasNextBatch()) {
- BatchData batchData = reader.nextBatch();
-
+ if ((dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
+ || reader.hasNextBatch()) {
+ BatchData batchData;
+ if (dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
+ batchData = dataSet.cachedBatchData[index];
+ else
+ batchData = reader.nextBatch();
+
int rowCount = 0;
- while (rowCount < fetchSize) {
-
- if ((rowLimit > 0 && alreadyReturnedRowNum >= rowLimit)) {
+ while (rowCount < dataSet.fetchSize) {
+
+ if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray[index] >= dataSet.limit)) {
break;
}
-
+
if (batchData != null && batchData.hasCurrent()) {
- if (rowOffset == 0) {
+ if (dataSet.offsetArray[index] == 0) {
long time = batchData.currentTime();
ReadWriteIOUtils.write(time, timeBAOS);
TSDataType type = batchData.getDataType();
@@ -136,20 +139,42 @@ public class NonAlignEngineDataSet extends QueryDataSet {
batchData.next();
}
else {
- break;
+ if (reader.hasNextBatch()) {
+ batchData = reader.nextBatch();
+ dataSet.cachedBatchData[index] = batchData;
+ continue;
+ }
+ else
+ break;
}
- if (rowOffset == 0) {
+ if (dataSet.offsetArray[index] == 0) {
rowCount++;
- if (rowLimit > 0) {
- alreadyReturnedRowNum++;
+ if (dataSet.limit > 0) {
+ dataSet.alreadyReturnedRowNumArray[index]++;
}
} else {
- rowOffset--;
+ dataSet.offsetArray[index]--;
}
}
-
- Pair<PublicBAOS, PublicBAOS> timeValueBAOSPair = new Pair(timeBAOS, valueBAOS);
-
+ if (rowCount == 0) {
+ blockingQueue.put(new Pair(null, null));
+ // set the hasRemaining field in reader to false
+ // tell the Consumer not to submit another task for this reader any more
+ reader.setHasRemaining(false);
+ // remove itself from the QueryTaskPoolManager
+ reader.setManagedByQueryManager(false);
+ return;
+ }
+
+ ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
+ timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
+ timeBuffer.flip();
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueBAOS.size());
+ valueBuffer.put(valueBAOS.getBuf(), 0, valueBAOS.size());
+ valueBuffer.flip();
+
+ Pair<ByteBuffer, ByteBuffer> timeValueBAOSPair = new Pair(timeBuffer, valueBuffer);
+
blockingQueue.put(timeValueBAOSPair);
// if the queue also has free space, just submit another itself
if (blockingQueue.remainingCapacity() > 0) {
@@ -162,7 +187,7 @@ public class NonAlignEngineDataSet extends QueryDataSet {
}
return;
}
- blockingQueue.put(new Pair(timeBAOS, valueBAOS));
+ blockingQueue.put(new Pair(null, null));
// set the hasRemaining field in reader to false
// tell the Consumer not to submit another task for this reader any more
reader.setHasRemaining(false);
@@ -176,19 +201,37 @@ public class NonAlignEngineDataSet extends QueryDataSet {
} catch (Exception e) {
LOGGER.error("Something gets wrong: ", e);
}
-
+
}
-
+
}
-
+
private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
-
+
// Blocking queue list for each time value buffer pair
- private BlockingQueue<Pair<PublicBAOS, PublicBAOS>>[] blockingQueueArray;
-
+ private BlockingQueue<Pair<ByteBuffer, ByteBuffer>>[] blockingQueueArray;
+
private boolean initialized = false;
-
+
+ private int[] offsetArray;
+
+ private int limit;
+
+ private int[] alreadyReturnedRowNumArray;
+
+ private BatchData[] cachedBatchData;
+
+ // indicate that there is no more batch data in the corresponding queue
+ // in case that the consumer thread is blocked on the queue and won't get runnable any more
+ // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+ // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+ // noMoreDataInQueue can still be true
+ // its usage is to tell the consumer thread not to call the take() method.
+ private boolean[] noMoreDataInQueueArray;
+
+ private int fetchSize;
+
// indicate that there is no more batch data in the corresponding queue
// in case that the consumer thread is blocked on the queue and won't get runnable any more
// this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
@@ -202,7 +245,7 @@ public class NonAlignEngineDataSet extends QueryDataSet {
private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
-
+
/**
* constructor of EngineDataSet.
*
@@ -211,102 +254,99 @@ public class NonAlignEngineDataSet extends QueryDataSet {
* @param readers readers in List(IPointReader) structure
*/
public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
- List<ManagedSeriesReader> readers) throws InterruptedException {
+ List<ManagedSeriesReader> readers) {
super(paths, dataTypes);
this.seriesReaderWithoutValueFilterList = readers;
blockingQueueArray = new BlockingQueue[readers.size()];
+ noMoreDataInQueueArray = new boolean[readers.size()];
for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
}
}
-
- private void init(WatermarkEncoder encoder, int fetchSize)
- throws InterruptedException {
+
+ private void initLimit(int offset, int limit, int size) {
+ offsetArray = new int[size];
+ Arrays.fill(offsetArray, offset);
+ this.limit = limit;
+ alreadyReturnedRowNumArray = new int[size];
+ cachedBatchData = new BatchData[size];
+ }
+
+ private void init(WatermarkEncoder encoder, int fetchSize) {
+ initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size());
+ this.fetchSize = fetchSize;
for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
reader.setHasRemaining(true);
reader.setManagedByQueryManager(true);
- pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, rowOffset, rowLimit, fetchSize));
+ pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, this, i));
}
this.initialized = true;
}
-
+
/**
* for RPC in RawData query between client and server
* fill time buffers and value buffers
*/
- public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException, InterruptedException {
+ public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws InterruptedException {
if (!initialized) {
init(encoder, fetchSize);
}
int seriesNum = seriesReaderWithoutValueFilterList.size();
TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
- PublicBAOS[] timeBAOSList = new PublicBAOS[seriesNum];
- PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
-
+ List<ByteBuffer> timeBufferList = new ArrayList<>(seriesNum);
+ List<ByteBuffer> valueBufferList = new ArrayList<>(seriesNum);
+
for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+ if (!noMoreDataInQueueArray[seriesIndex]) {
+ Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex].take();
+ if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) {
+ noMoreDataInQueueArray[seriesIndex] = true;
+ timeValueByteBufferPair.left = ByteBuffer.allocate(0);
+ timeValueByteBufferPair.right = ByteBuffer.allocate(0);
+ }
+ timeBufferList.add(timeValueByteBufferPair.left);
+ valueBufferList.add(timeValueByteBufferPair.right);
+ }
+ else {
+ timeBufferList.add(ByteBuffer.allocate(0));
+ valueBufferList.add(ByteBuffer.allocate(0));
+ continue;
+ }
+
synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
// if the reader isn't being managed and still has more data,
// that means this read task leave the pool before because the queue has no more space
// now we should submit it again
- if (reader.isManagedByQueryManager() && reader.hasRemaining()) {
+ if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
reader.setManagedByQueryManager(true);
- pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex],
- encoder, rowOffset, rowLimit, fetchSize));
+ pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex],
+ encoder, this, seriesIndex));
}
}
- Pair<PublicBAOS, PublicBAOS> timevalueBAOSPair = blockingQueueArray[seriesIndex].poll();
- if (timevalueBAOSPair != null) {
- timeBAOSList[seriesIndex] = timevalueBAOSPair.left;
- valueBAOSList[seriesIndex] = timevalueBAOSPair.right;
- }
- else {
- timeBAOSList[seriesIndex] = new PublicBAOS();
- valueBAOSList[seriesIndex] = new PublicBAOS();
- }
}
}
- List<ByteBuffer> timeBufferList = new ArrayList<>();
- List<ByteBuffer> valueBufferList = new ArrayList<>();
-
- for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
- // add time buffer of current series
- putPBOSToBuffer(timeBAOSList, timeBufferList, seriesIndex);
-
- // add value buffer of current series
- putPBOSToBuffer(valueBAOSList, valueBufferList, seriesIndex);
- }
-
// set time buffers, value buffers and bitmap buffers
tsQueryNonAlignDataSet.setTimeList(timeBufferList);
tsQueryNonAlignDataSet.setValueList(valueBufferList);
return tsQueryNonAlignDataSet;
}
-
- private void putPBOSToBuffer(PublicBAOS[] aBAOSList, List<ByteBuffer> aBufferList,
- int tsIndex) {
- ByteBuffer aBuffer = ByteBuffer.allocate(aBAOSList[tsIndex].size());
- aBuffer.put(aBAOSList[tsIndex].getBuf(), 0, aBAOSList[tsIndex].size());
- aBuffer.flip();
- aBufferList.add(aBuffer);
- }
+
@Override
- protected boolean hasNextWithoutConstraint() throws IOException {
- // TODO Auto-generated method stub
+ protected boolean hasNextWithoutConstraint() {
return false;
}
@Override
- protected RowRecord nextWithoutConstraint() throws IOException {
- // TODO Auto-generated method stub
+ protected RowRecord nextWithoutConstraint() {
return null;
}
-
+
}