You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/01/14 08:41:26 UTC
[incubator-iotdb] 04/04: Disable align
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch DisableAlign
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 3ee98df2f6659a55dd407a1e28d1ebadddf304b5
Merge: 8f8d570 7c0f1d5
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Tue Jan 14 16:40:41 2020 +0800
Disable align
NOTICE | 2 +-
NOTICE-binary | 2 +-
RELEASE_NOTES.md | 28 +
client-py/compile.bat | 5 +-
client-py/readme.md | 2 +-
client-py/src/client_example.py | 2 +
.../org/apache/iotdb/client/AbstractClient.java | 2 +-
.../UserGuide/3-Server/4-Config Manual.md | 37 +-
.../5-Operation Manual/4-SQL Reference.md | 13 +-
.../UserGuide/3-Server/4-Config Manual.md | 39 +-
.../4-Client/4-Programming - Other Languages.md | 53 +-
.../5-Operation Manual/4-SQL Reference.md | 13 +-
.../8-System Design (Developer)/1-Hierarchy.md | 6 +-
.../main/java/org/apache/iotdb/JDBCExample.java | 2 +-
server/pom.xml | 12 +
.../resources/conf/iotdb-engine.properties | 15 +-
server/src/assembly/resources/conf/iotdb-env.bat | 31 +-
server/src/assembly/resources/conf/iotdb-env.sh | 31 +-
server/src/assembly/resources/conf/logback.xml | 12 +-
.../src/assembly/resources/sbin/start-server.bat | 6 +-
server/src/assembly/resources/sbin/start-server.sh | 6 +-
.../tsfileToolSet/print-tsfile-resource-files.sh | 0
.../tools/tsfileToolSet/print-tsfile-sketch.sh | 0
.../org/apache/iotdb/db/qp/strategy/SqlBase.g4 | 6 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 102 ++-
.../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 48 +-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 7 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 39 +-
.../db/conf/adapter/IoTDBConfigDynamicAdapter.java | 5 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 39 +-
.../iotdb/db/engine/flush/MemTableFlushTask.java | 24 +-
.../iotdb/db/engine/flush/TsFileFlushPolicy.java | 2 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 8 +-
.../apache/iotdb/db/engine/memtable/IMemTable.java | 5 +-
.../db/engine/memtable/IWritableMemChunk.java | 16 +-
.../iotdb/db/engine/memtable/WritableMemChunk.java | 80 +-
.../db/engine/merge/manage/MergeResource.java | 3 +-
.../db/engine/modification/ModificationFile.java | 2 +-
.../io/LocalTextModificationAccessor.java | 2 +-
.../engine/modification/io/ModificationWriter.java | 1 +
.../engine/storagegroup/StorageGroupProcessor.java | 813 ++++++++++++++-------
.../db/engine/storagegroup/TsFileProcessor.java | 85 +--
.../db/engine/storagegroup/TsFileResource.java | 7 +-
.../version/SimpleFileVersionController.java | 51 +-
.../java/org/apache/iotdb/db/metadata/MGraph.java | 2 +-
.../java/org/apache/iotdb/db/metadata/MTree.java | 2 +-
.../apache/iotdb/db/qp/constant/DatetimeUtils.java | 2 +-
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 52 +-
.../iotdb/db/qp/physical/crud/BatchInsertPlan.java | 56 +-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 34 +-
.../iotdb/db/query/context/QueryContext.java | 19 +-
.../db/query/dataset/NonAlignEngineDataSet.java | 22 +-
.../iotdb/db/query/executor/EngineExecutor.java | 8 +-
.../org/apache/iotdb/db/rescon/MemTablePool.java | 24 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 6 +-
.../org/apache/iotdb/db/service/JDBCService.java | 9 +-
.../apache/iotdb/db/service/MetricsService.java | 62 +-
.../apache/iotdb/db/service/RegisterManager.java | 3 +
.../iotdb/db/sync/conf/SyncSenderConfig.java | 18 -
.../iotdb/db/sync/receiver/SyncServerManager.java | 9 +-
.../iotdb/db/sync/receiver/load/FileLoader.java | 2 +-
.../db/sync/receiver/load/FileLoaderManager.java | 9 +-
.../db/sync/sender/manage/ISyncFileManager.java | 10 +-
.../db/sync/sender/manage/SyncFileManager.java | 137 ++--
.../iotdb/db/sync/sender/transfer/ISyncClient.java | 6 +-
.../iotdb/db/sync/sender/transfer/SyncClient.java | 120 +--
.../FileUtils.java} | 46 +-
.../java/org/apache/iotdb/db/utils/SyncUtils.java | 10 +-
.../iotdb/db/utils/datastructure/BinaryTVList.java | 30 +
.../db/utils/datastructure/BooleanTVList.java | 30 +
.../iotdb/db/utils/datastructure/DoubleTVList.java | 30 +
.../iotdb/db/utils/datastructure/FloatTVList.java | 30 +
.../iotdb/db/utils/datastructure/IntTVList.java | 30 +
.../iotdb/db/utils/datastructure/LongTVList.java | 30 +
.../iotdb/db/utils/datastructure/TVList.java | 30 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 13 +-
.../writelog/recover/TsFileRecoverPerformer.java | 2 +-
.../db/conf/adapter/CompressionRatioTest.java | 6 -
.../adapter/IoTDBConfigDynamicAdapterTest.java | 9 +-
.../db/engine/cache/DeviceMetaDataCacheTest.java | 5 +-
.../iotdb/db/engine/merge/MergeTaskTest.java | 9 +-
.../apache/iotdb/db/engine/merge/MergeTest.java | 10 +-
.../engine/modification/DeletionFileNodeTest.java | 56 +-
.../engine/modification/ModificationFileTest.java | 8 +-
.../io/LocalTextModificationAccessorTest.java | 4 +-
.../storagegroup/StorageGroupProcessorTest.java | 14 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 61 +-
.../engine/storagegroup/TsFileProcessorTest.java | 8 +-
.../version/SimpleFileVersionControllerTest.java | 4 +-
.../iotdb/db/integration/IOTDBGroupByIT.java | 29 +-
.../integration/IOTDBGroupByInnerIntervalIT.java | 6 -
.../iotdb/db/integration/IoTDBAggregationIT.java | 42 +-
.../integration/IoTDBAggregationLargeDataIT.java | 8 +-
.../integration/IoTDBAggregationSmallDataIT.java | 6 -
.../iotdb/db/integration/IoTDBAuthorizationIT.java | 18 +-
.../db/integration/IoTDBAutoCreateSchemaIT.java | 6 -
.../apache/iotdb/db/integration/IoTDBCloseIT.java | 6 -
.../iotdb/db/integration/IoTDBCompleteIT.java | 6 -
.../apache/iotdb/db/integration/IoTDBDaemonIT.java | 6 +-
.../db/integration/IoTDBDeleteStorageGroupIT.java | 6 -
.../iotdb/db/integration/IoTDBDeletionIT.java | 16 +-
.../db/integration/IoTDBEngineTimeGeneratorIT.java | 5 -
.../apache/iotdb/db/integration/IoTDBFillIT.java | 6 -
.../db/integration/IoTDBFloatPrecisionIT.java | 6 -
.../db/integration/IoTDBFlushQueryMergeTest.java | 13 +-
.../iotdb/db/integration/IoTDBGroupbyDeviceIT.java | 5 -
.../iotdb/db/integration/IoTDBLargeDataIT.java | 5 -
.../iotdb/db/integration/IoTDBLimitSlimitIT.java | 5 -
.../integration/IoTDBLoadExternalTsfileTest.java | 56 +-
.../iotdb/db/integration/IoTDBMergeTest.java | 6 -
.../iotdb/db/integration/IoTDBMetadataFetchIT.java | 6 -
.../iotdb/db/integration/IoTDBMultiSeriesIT.java | 50 +-
.../db/integration/IoTDBMultiStatementsIT.java | 6 -
.../iotdb/db/integration/IoTDBNumberPathIT.java | 6 -
.../iotdb/db/integration/IoTDBQueryDemoIT.java | 4 -
.../iotdb/db/integration/IoTDBQuotedPathIT.java | 5 -
...IoTDBAggregationIT.java => IoTDBRecoverIT.java} | 317 ++------
.../db/integration/IoTDBSequenceDataQueryIT.java | 5 -
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 5 -
.../iotdb/db/integration/IoTDBSimpleQueryTest.java | 5 -
.../iotdb/db/integration/IoTDBTimeZoneIT.java | 5 -
.../apache/iotdb/db/integration/IoTDBTtlIT.java | 6 -
.../iotdb/db/integration/IoTDBVersionIT.java | 6 -
.../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 59 +-
.../fileRelated/UnSealedTsFileReaderTest.java | 13 +-
.../NewUnseqResourceMergeReaderTest.java | 13 +-
.../resourceRelated/SeqResourceReaderTest.java | 7 +-
.../resourceRelated/UnseqResourceReaderTest.java | 5 +-
.../db/sync/receiver/load/FileLoaderTest.java | 55 +-
.../recover/SyncReceiverLogAnalyzerTest.java | 7 +-
.../db/sync/sender/manage/SyncFileManagerTest.java | 186 +++--
.../sender/recover/SyncSenderLogAnalyzerTest.java | 64 +-
.../apache/iotdb/db/tools/IoTDBWatermarkTest.java | 16 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 59 +-
.../iotdb/db/writelog/IoTDBLogFileSizeTest.java | 7 -
.../apache/iotdb/db/writelog/PerformanceTest.java | 2 +
.../db/writelog/recover/SeqTsFileRecoverTest.java | 2 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 3 +-
server/src/test/resources/logback.xml | 2 +-
service-rpc/src/pypi/setup.py | 2 +-
session/pom.xml | 9 +-
.../java/org/apache/iotdb/session/Session.java | 129 +++-
.../org/apache/iotdb/session/IoTDBSessionIT.java | 335 ++++++++-
.../iotdb/session/utils/EnvironmentUtils.java | 190 -----
.../iotdb/tsfile/file/metadata/ChunkMetaData.java | 3 +-
.../tsfile/fileSystem/fsFactory/FSFactory.java | 67 ++
.../tsfile/fileSystem/fsFactory/HDFSFactory.java | 15 +
.../fileSystem/fsFactory/LocalFSFactory.java | 9 +
148 files changed, 2871 insertions(+), 1827 deletions(-)
diff --cc client/src/main/java/org/apache/iotdb/client/AbstractClient.java
index 0531ba0,23c2440..a5a85eb
--- a/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
+++ b/client/src/main/java/org/apache/iotdb/client/AbstractClient.java
@@@ -527,33 -485,16 +527,33 @@@ public abstract class AbstractClient
}
maxValueLength = tmp;
}
- for (int i = 2; i <= colCount; i++) {
- if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
- blockLine.append(StringUtils.repeat('-', deviceColumnLength)).append("+");
- } else {
- blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
+ if (printTimestamp) {
+ for (int i = 2; i <= colCount; i++) {
+ if (i == 2 && resultSetMetaData.getColumnName(2).equals(GROUPBY_DEVICE_COLUMN_NAME)) {
+ blockLine.append(StringUtils.repeat('-', deviceColumnLength)).append("+");
+ } else {
+ blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
+ }
+ }
+ } else {
+ for (int i = 1; i <= colCount; i++) {
+ blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
}
}
- } else {
- blockLine.append("+");
+ }
+ // for disable align clause
+ else {
+ int tmp = Integer.MIN_VALUE;
for (int i = 1; i <= colCount; i++) {
+ int len = resultSetMetaData.getColumnLabel(i).length();
+ tmp = Math.max(tmp, len);
+ }
+ maxValueLength = tmp;
+ blockLine.append("+");
+ for (int i = 2; i <= colCount / 2 + 1; i++) {
+ if (printTimestamp) {
+ blockLine.append(StringUtils.repeat('-', maxTimeLength)).append("+");
- }
++ }
blockLine.append(StringUtils.repeat('-', maxValueLength)).append("+");
}
}
diff --cc example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
index cffdd32,99a6193..8b4bc7b
--- a/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
+++ b/example/jdbc/src/main/java/org/apache/iotdb/JDBCExample.java
@@@ -13,11 -31,23 +13,11 @@@ public class JDBCExample
Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- statement.execute("SET STORAGE GROUP TO root.sg1");
- statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE");
- statement.execute("CREATE TIMESERIES root.sg1.d1.s2 WITH DATATYPE=INT64, ENCODING=RLE");
- statement.execute("CREATE TIMESERIES root.sg1.d1.s3 WITH DATATYPE=INT64, ENCODING=RLE");
-
- for (int i = 0; i <= 100; i++) {
- statement.addBatch("insert into root.sg1.d1(timestamp, s1, s2, s3) values("+ i + "," + 1 + "," + 1 + "," + 1 + ")");
- }
- statement.executeBatch();
- statement.clearBatch();
-
- ResultSet resultSet = statement.executeQuery("select * from root where time <= 10");
- outputResult(resultSet);
- resultSet = statement.executeQuery("select count(*) from root");
- outputResult(resultSet);
- resultSet = statement.executeQuery("select count(*) from root where time >= 1 and time <= 100 group by ([0, 100], 20ms, 20ms)");
+ long startTime = System.currentTimeMillis();
- ResultSet resultSet = statement.executeQuery("select * from root where time < 30000000");
++ ResultSet resultSet = statement.executeQuery("select * from root where time < 100000000 disable align");
outputResult(resultSet);
+ long endTime = System.currentTimeMillis();
+ System.out.println("Cost Time: " + (endTime - startTime));
}
}
diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index 09c648f,479849a..e4e8e31
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@@ -18,30 -18,31 +18,27 @@@
*/
package org.apache.iotdb.db.conf;
--import java.io.File;
--import java.io.FileInputStream;
--import java.io.FileOutputStream;
--import java.io.IOException;
--import java.io.InputStreamReader;
--import java.util.Properties;
--
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
++import java.io.*;
++import java.util.Properties;
++
public class IoTDBConfigCheck {
// this file is located in data/system/schema/system_properties.
// If user delete folder "data", system_properties can reset.
public static final String PROPERTIES_FILE_NAME = "system.properties";
public static final String SCHEMA_DIR =
-- IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
++ IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
private static final IoTDBConfigCheck INSTANCE = new IoTDBConfigCheck();
private static final Logger logger = LoggerFactory.getLogger(IoTDBDescriptor.class);
- private Properties properties = new Properties();
// this is a initial parameter.
private static String TIMESTAMP_PRECISION = "ms";
+ private static long PARTITION_INTERVAL = 86400;
+ private Properties properties = new Properties();
public static final IoTDBConfigCheck getInstance() {
return IoTDBConfigCheck.INSTANCE;
@@@ -49,6 -50,24 +46,24 @@@
public void checkConfig() {
TIMESTAMP_PRECISION = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
+
+ // check time stamp precision
+ if (!(TIMESTAMP_PRECISION.equals("ms") || TIMESTAMP_PRECISION.equals("us")
- || TIMESTAMP_PRECISION.equals("ns"))) {
++ || TIMESTAMP_PRECISION.equals("ns"))) {
+ logger.error("Wrong timestamp precision, please set as: ms, us or ns ! Current is: "
- + TIMESTAMP_PRECISION);
++ + TIMESTAMP_PRECISION);
+ System.exit(-1);
+ }
+
+ PARTITION_INTERVAL = IoTDBDescriptor.getInstance().getConfig()
- .getPartitionInterval();
++ .getPartitionInterval();
+
+ // check partition interval
+ if (PARTITION_INTERVAL <= 0) {
+ logger.error("Partition interval must larger than 0!");
+ System.exit(-1);
+ }
+
createDir(SCHEMA_DIR);
checkFile(SCHEMA_DIR);
logger.info("System configuration is ok.");
@@@ -65,7 -84,8 +80,8 @@@
private void checkFile(String filepath) {
// create file : read timestamp precision from engine.properties, create system_properties.txt
// use output stream to write timestamp precision to file.
- File file = SystemFileFactory.INSTANCE.getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
+ File file = SystemFileFactory.INSTANCE
- .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
++ .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
try {
if (!file.exists()) {
file.createNewFile();
@@@ -79,12 -100,19 +96,19 @@@
logger.error("Can not create {}.", file.getAbsolutePath(), e);
}
// get existed properties from system_properties.txt
- File inputFile = SystemFileFactory.INSTANCE.getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
+ File inputFile = SystemFileFactory.INSTANCE
- .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
++ .getFile(filepath + File.separator + PROPERTIES_FILE_NAME);
try (FileInputStream inputStream = new FileInputStream(inputFile.toString())) {
properties.load(new InputStreamReader(inputStream, TSFileConfig.STRING_CHARSET));
if (!properties.getProperty("timestamp_precision").equals(TIMESTAMP_PRECISION)) {
logger.error("Wrong timestamp precision, please set as: " + properties
-- .getProperty("timestamp_precision") + " !");
++ .getProperty("timestamp_precision") + " !");
+ System.exit(-1);
+ }
+ if (!(Long.parseLong(properties.getProperty("storage_group_time_range"))
- == PARTITION_INTERVAL)) {
++ == PARTITION_INTERVAL)) {
+ logger.error("Wrong storage group time range, please set as: " + properties
- .getProperty("storage_group_time_range") + " !");
++ .getProperty("storage_group_time_range") + " !");
System.exit(-1);
}
} catch (IOException e) {
diff --cc server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
index c2e72e5,0000000..b024450
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NonAlignEngineDataSet.java
@@@ -1,352 -1,0 +1,354 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset;
+
+import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
+import org.apache.iotdb.db.query.reader.ManagedSeriesReader;
+import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
++import java.util.concurrent.atomic.AtomicIntegerArray;
+
+public class NonAlignEngineDataSet extends QueryDataSet {
+
+ private static class ReadTask implements Runnable {
+
+ private final ManagedSeriesReader reader;
+ private BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue;
+ private WatermarkEncoder encoder;
+ NonAlignEngineDataSet dataSet;
+ private int index;
+
+
+ public ReadTask(ManagedSeriesReader reader,
+ BlockingQueue<Pair<ByteBuffer, ByteBuffer>> blockingQueue,
+ WatermarkEncoder encoder, NonAlignEngineDataSet dataSet, int index) {
+ this.reader = reader;
+ this.blockingQueue = blockingQueue;
+ this.encoder = encoder;
+ this.dataSet = dataSet;
+ this.index = index;
+ }
+
+ @Override
+ public void run() {
+ PublicBAOS timeBAOS = new PublicBAOS();
+ PublicBAOS valueBAOS = new PublicBAOS();
+ try {
+ synchronized (reader) {
+ // if the task is submitted, there must be free space in the queue
+ // so here we don't need to check whether the queue has free space
+ // the reader has next batch
+ if ((dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
+ || reader.hasNextBatch()) {
+ BatchData batchData;
+ if (dataSet.cachedBatchData[index] != null && dataSet.cachedBatchData[index].hasCurrent())
+ batchData = dataSet.cachedBatchData[index];
+ else
+ batchData = reader.nextBatch();
+
+ int rowCount = 0;
+ while (rowCount < dataSet.fetchSize) {
+
- if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray[index] >= dataSet.limit)) {
++ if ((dataSet.limit > 0 && dataSet.alreadyReturnedRowNumArray.get(index) >= dataSet.limit)) {
+ break;
+ }
+
+ if (batchData != null && batchData.hasCurrent()) {
- if (dataSet.offsetArray[index] == 0) {
++ if (dataSet.offsetArray.get(index) == 0) {
+ long time = batchData.currentTime();
+ ReadWriteIOUtils.write(time, timeBAOS);
+ TSDataType type = batchData.getDataType();
+ switch (type) {
+ case INT32:
+ int intValue = batchData.getInt();
+ if (encoder != null && encoder.needEncode(time)) {
+ intValue = encoder.encodeInt(intValue, time);
+ }
+ ReadWriteIOUtils.write(intValue, valueBAOS);
+ break;
+ case INT64:
+ long longValue = batchData.getLong();
+ if (encoder != null && encoder.needEncode(time)) {
+ longValue = encoder.encodeLong(longValue, time);
+ }
+ ReadWriteIOUtils.write(longValue, valueBAOS);
+ break;
+ case FLOAT:
+ float floatValue = batchData.getFloat();
+ if (encoder != null && encoder.needEncode(time)) {
+ floatValue = encoder.encodeFloat(floatValue, time);
+ }
+ ReadWriteIOUtils.write(floatValue, valueBAOS);
+ break;
+ case DOUBLE:
+ double doubleValue = batchData.getDouble();
+ if (encoder != null && encoder.needEncode(time)) {
+ doubleValue = encoder.encodeDouble(doubleValue, time);
+ }
+ ReadWriteIOUtils.write(doubleValue, valueBAOS);
+ break;
+ case BOOLEAN:
+ ReadWriteIOUtils.write(batchData.getBoolean(),
+ valueBAOS);
+ break;
+ case TEXT:
+ ReadWriteIOUtils
+ .write(batchData.getBinary(),
+ valueBAOS);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
+ }
+ }
+ batchData.next();
+ }
+ else {
+ if (reader.hasNextBatch()) {
+ batchData = reader.nextBatch();
+ dataSet.cachedBatchData[index] = batchData;
+ continue;
+ }
+ else
+ break;
+ }
- if (dataSet.offsetArray[index] == 0) {
++ if (dataSet.offsetArray.get(index) == 0) {
+ rowCount++;
+ if (dataSet.limit > 0) {
- dataSet.alreadyReturnedRowNumArray[index]++;
++ dataSet.alreadyReturnedRowNumArray.incrementAndGet(index);
+ }
+ } else {
- dataSet.offsetArray[index]--;
++ dataSet.offsetArray.decrementAndGet(index);
+ }
+ }
+ if (rowCount == 0) {
+ blockingQueue.put(new Pair(null, null));
+ // set the hasRemaining field in reader to false
+ // tell the Consumer not to submit another task for this reader any more
+ reader.setHasRemaining(false);
+ // remove itself from the QueryTaskPoolManager
+ reader.setManagedByQueryManager(false);
+ return;
+ }
+
+ ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size());
+ timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size());
+ timeBuffer.flip();
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueBAOS.size());
+ valueBuffer.put(valueBAOS.getBuf(), 0, valueBAOS.size());
+ valueBuffer.flip();
+
+ Pair<ByteBuffer, ByteBuffer> timeValueBAOSPair = new Pair(timeBuffer, valueBuffer);
+
+ blockingQueue.put(timeValueBAOSPair);
+ // if the queue also has free space, just submit another itself
+ if (blockingQueue.remainingCapacity() > 0) {
+ pool.submit(this);
+ }
+ // the queue has no more space
+ // remove itself from the QueryTaskPoolManager
+ else {
+ reader.setManagedByQueryManager(false);
+ }
+ return;
+ }
+ blockingQueue.put(new Pair(null, null));
+ // set the hasRemaining field in reader to false
+ // tell the Consumer not to submit another task for this reader any more
+ reader.setHasRemaining(false);
+ // remove itself from the QueryTaskPoolManager
+ reader.setManagedByQueryManager(false);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupted while putting into the blocking queue: ", e);
+ } catch (IOException e) {
+ LOGGER.error("Something gets wrong while reading from the series reader: ", e);
+ } catch (Exception e) {
+ LOGGER.error("Something gets wrong: ", e);
+ }
+
+ }
+
+ }
+
+
+ private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList;
+
+ // Blocking queue list for each time value buffer pair
+ private BlockingQueue<Pair<ByteBuffer, ByteBuffer>>[] blockingQueueArray;
+
+ private boolean initialized = false;
+
- private int[] offsetArray;
++ private AtomicIntegerArray offsetArray;
+
+ private int limit;
+
- private int[] alreadyReturnedRowNumArray;
++ private AtomicIntegerArray alreadyReturnedRowNumArray;
+
+ private BatchData[] cachedBatchData;
+
+ // indicate that there is no more batch data in the corresponding queue
+ // in case that the consumer thread is blocked on the queue and won't get runnable any more
+ // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+ // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+ // noMoreDataInQueue can still be true
+ // its usage is to tell the consumer thread not to call the take() method.
+ private boolean[] noMoreDataInQueueArray;
+
+ private int fetchSize;
+
+ // indicate that there is no more batch data in the corresponding queue
+ // in case that the consumer thread is blocked on the queue and won't get runnable any more
+ // this field is not same as the `hasRemaining` in SeriesReaderWithoutValueFilter
+ // even though the `hasRemaining` in SeriesReaderWithoutValueFilter is false
+ // noMoreDataInQueue can still be true
+ // its usage is to tell the consumer thread not to call the take() method.
+
+ // capacity for blocking queue
+ private static final int BLOCKING_QUEUE_CAPACITY = 5;
+
+ private static final QueryTaskPoolManager pool = QueryTaskPoolManager.getInstance();
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonAlignEngineDataSet.class);
+
+ /**
+ * constructor of EngineDataSet.
+ *
+ * @param paths paths in List structure
+ * @param dataTypes time series data type
+ * @param readers readers in List(IPointReader) structure
+ */
+ public NonAlignEngineDataSet(List<Path> paths, List<TSDataType> dataTypes,
+ List<ManagedSeriesReader> readers) {
+ super(paths, dataTypes);
+ this.seriesReaderWithoutValueFilterList = readers;
+ blockingQueueArray = new BlockingQueue[readers.size()];
+ noMoreDataInQueueArray = new boolean[readers.size()];
+ for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+ blockingQueueArray[i] = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
+ }
+ }
+
+ private void initLimit(int offset, int limit, int size) {
- offsetArray = new int[size];
- Arrays.fill(offsetArray, offset);
++ int[] offsetArrayTemp = new int[size];
++ Arrays.fill(offsetArrayTemp, offset);
++ offsetArray = new AtomicIntegerArray(offsetArrayTemp);
+ this.limit = limit;
- alreadyReturnedRowNumArray = new int[size];
++ this.alreadyReturnedRowNumArray = new AtomicIntegerArray(size);
+ cachedBatchData = new BatchData[size];
+ }
+
+ private void init(WatermarkEncoder encoder, int fetchSize) {
+ initLimit(super.rowOffset, super.rowLimit, seriesReaderWithoutValueFilterList.size());
+ this.fetchSize = fetchSize;
+ for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) {
+ ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i);
+ reader.setHasRemaining(true);
+ reader.setManagedByQueryManager(true);
+ pool.submit(new ReadTask(reader, blockingQueueArray[i], encoder, this, i));
+ }
+ this.initialized = true;
+ }
+
+ /**
+ * for RPC in RawData query between client and server
+ * fill time buffers and value buffers
+ */
+ public TSQueryNonAlignDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws InterruptedException {
+ if (!initialized) {
+ init(encoder, fetchSize);
+ }
+ int seriesNum = seriesReaderWithoutValueFilterList.size();
+ TSQueryNonAlignDataSet tsQueryNonAlignDataSet = new TSQueryNonAlignDataSet();
+
+ List<ByteBuffer> timeBufferList = new ArrayList<>(seriesNum);
+ List<ByteBuffer> valueBufferList = new ArrayList<>(seriesNum);
+
+ for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+ if (!noMoreDataInQueueArray[seriesIndex]) {
+ Pair<ByteBuffer, ByteBuffer> timeValueByteBufferPair = blockingQueueArray[seriesIndex].take();
+ if (timeValueByteBufferPair.left == null || timeValueByteBufferPair.right == null) {
+ noMoreDataInQueueArray[seriesIndex] = true;
+ timeValueByteBufferPair.left = ByteBuffer.allocate(0);
+ timeValueByteBufferPair.right = ByteBuffer.allocate(0);
+ }
+ timeBufferList.add(timeValueByteBufferPair.left);
+ valueBufferList.add(timeValueByteBufferPair.right);
+ }
+ else {
+ timeBufferList.add(ByteBuffer.allocate(0));
+ valueBufferList.add(ByteBuffer.allocate(0));
+ continue;
+ }
+
+ synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) {
+ if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) {
+ ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex);
+ // if the reader isn't being managed and still has more data,
+ // that means this read task leave the pool before because the queue has no more space
+ // now we should submit it again
+ if (!reader.isManagedByQueryManager() && reader.hasRemaining()) {
+ reader.setManagedByQueryManager(true);
+ pool.submit(new ReadTask(reader, blockingQueueArray[seriesIndex],
+ encoder, this, seriesIndex));
+ }
+ }
+ }
+ }
+
+ // set time buffers, value buffers and bitmap buffers
+ tsQueryNonAlignDataSet.setTimeList(timeBufferList);
+ tsQueryNonAlignDataSet.setValueList(valueBufferList);
+
+ return tsQueryNonAlignDataSet;
+ }
+
+
+ @Override
+ protected boolean hasNextWithoutConstraint() {
+ return false;
+ }
+
+ @Override
+ protected RowRecord nextWithoutConstraint() {
+ return null;
+ }
+
+
+}
diff --cc server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
index bc70fa8,f6582ff..0a10375
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java
@@@ -89,32 -88,6 +89,28 @@@ public class EngineExecutor
throw new StorageEngineException(e.getMessage());
}
}
+
+ public QueryDataSet executeNonAlign(QueryContext context)
+ throws StorageEngineException, IOException {
+
+ Filter timeFilter = null;
+ if (optimizedExpression != null) {
+ timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter();
+ }
+
+ List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>();
+ for (int i = 0; i < deduplicatedPaths.size(); i++) {
+ Path path = deduplicatedPaths.get(i);
+ TSDataType dataType = deduplicatedDataTypes.get(i);
+
+ ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context,
+ true);
+ readersOfSelectedSeries.add(reader);
+ }
+
- try {
- return new NonAlignEngineDataSet(deduplicatedPaths, deduplicatedDataTypes,
- readersOfSelectedSeries);
- } catch (InterruptedException e) {
- throw new StorageEngineException(e.getMessage());
- }
++ return new NonAlignEngineDataSet(deduplicatedPaths, deduplicatedDataTypes,
++ readersOfSelectedSeries);
+ }
/**
* executeWithValueFilter query.
diff --cc server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
index 4b03cb1,3750746..87157bf
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
@@@ -18,8 -18,8 +18,6 @@@
*/
package org.apache.iotdb.db.rescon;
--import java.util.ArrayDeque;
--import java.util.Deque;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@@ -27,6 -27,6 +25,9 @@@ import org.apache.iotdb.db.engine.memta
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
++import java.util.ArrayDeque;
++import java.util.Deque;
++
public class MemTablePool {
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
diff --cc session/pom.xml
index bc7b83b,8df9ae6..dab30aa
--- a/session/pom.xml
+++ b/session/pom.xml
@@@ -19,125 -19,134 +19,132 @@@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>iotdb-parent</artifactId>
- <groupId>org.apache.iotdb</groupId>
- <version>0.10.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>iotdb-session</artifactId>
- <name>IoTDB Session</name>
- <properties>
- <session.test.skip>false</session.test.skip>
- <session.it.skip>${session.test.skip}</session.it.skip>
- <session.ut.skip>${session.test.skip}</session.ut.skip>
- </properties>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.1.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <!-- this is used for inheritance merges -->
- <phase>package</phase>
- <!-- bind to the packaging phase -->
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!--using `mvn test` to run UT, `mvn verify` to run ITs
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-parent</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>0.10.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>iotdb-session</artifactId>
+ <name>IoTDB Session</name>
+ <properties>
+ <session.test.skip>false</session.test.skip>
+ <session.it.skip>${session.test.skip}</session.it.skip>
+ <session.ut.skip>${session.test.skip}</session.ut.skip>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ <!-- bind to the packaging phase -->
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!--using `mvn test` to run UT, `mvn verify` to run ITs
- Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
+ Reference: https://antoniogoncalves.org/2012/12/13/lets-turn-integration-tests-with-maven-to-a-first-class-citizen/-->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skipTests>${session.ut.skip}</skipTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <executions>
- <execution>
- <id>run-integration-tests</id>
- <phase>integration-test</phase>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <skipTests>${session.test.skip}</skipTests>
- <skipITs>${session.it.skip}</skipITs>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>service-rpc</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>tsfile</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-server</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.iotdb</groupId>
- <artifactId>iotdb-jdbc</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
- </dependencies>
- <profiles>
- <profile>
- <id>skipSessionTests</id>
- <activation>
- <property>
- <name>skipTests</name>
- <value>true</value>
- </property>
- </activation>
- <properties>
- <session.test.skip>true</session.test.skip>
- <session.ut.skip>true</session.ut.skip>
- <session.it.skip>true</session.it.skip>
- </properties>
- </profile>
- <profile>
- <id>skipUT_SessionTests</id>
- <activation>
- <property>
- <name>skipUTs</name>
- <value>true</value>
- </property>
- </activation>
- <properties>
- <session.ut.skip>true</session.ut.skip>
- </properties>
- </profile>
- </profiles>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>${session.ut.skip}</skipTests>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>run-integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <skipTests>${session.test.skip}</skipTests>
+ <skipITs>${session.it.skip}</skipITs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>service-rpc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
++ <type>test-jar</type>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.iotdb</groupId>
++ <artifactId>iotdb-server</artifactId>
++ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-jdbc</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>skipSessionTests</id>
+ <activation>
+ <property>
+ <name>skipTests</name>
+ <value>true</value>
+ </property>
+ </activation>
+ <properties>
+ <session.test.skip>true</session.test.skip>
+ <session.ut.skip>true</session.ut.skip>
+ <session.it.skip>true</session.it.skip>
+ </properties>
+ </profile>
+ <profile>
+ <id>skipUT_SessionTests</id>
+ <activation>
+ <property>
+ <name>skipUTs</name>
+ <value>true</value>
+ </property>
+ </activation>
+ <properties>
+ <session.ut.skip>true</session.ut.skip>
+ </properties>
+ </profile>
+ </profiles>
</project>