You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/07/27 12:06:43 UTC
[incubator-iotdb] 01/01: add external sort in time range query
This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch external_sort
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 10580b120c695cf00f69fae7a29132813e83b300
Author: suyue <23...@qq.com>
AuthorDate: Sat Jul 27 20:05:17 2019 +0800
add external sort in time range query
---
server/iotdb/conf/iotdb-engine.properties | 7 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 33 +++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +
.../db/query/control/QueryResourceManager.java | 57 ++++--
.../db/query/externalsort/ExternalSortJob.java | 51 +++++
.../query/externalsort/ExternalSortJobEngine.java | 46 +++++
.../db/query/externalsort/ExternalSortJobPart.java | 44 +++++
.../externalsort/ExternalSortJobScheduler.java | 44 +++++
.../iotdb/db/query/externalsort/LineMerger.java | 55 ++++++
.../MultiSourceExternalSortJobPart.java | 61 ++++++
.../externalsort/SimpleExternalSortEngine.java | 113 +++++++++++
.../SingleSourceExternalSortJobPart.java | 38 ++++
.../serialize/TimeValuePairDeserializer.java | 32 ++++
.../serialize/TimeValuePairSerializer.java | 30 +++
.../impl/FixLengthTimeValuePairDeserializer.java | 208 +++++++++++++++++++++
.../impl/FixLengthTimeValuePairSerializer.java | 165 ++++++++++++++++
.../impl/SimpleTimeValuePairDeserializer.java | 76 ++++++++
.../impl/SimpleTimeValuePairSerializer.java | 66 +++++++
.../resourceRelated/UnseqResourceMergeReader.java | 30 ++-
.../reader/universal/PriorityMergeReader.java | 21 +++
.../query/reader/universal/PriorityReaderBean.java | 41 ++++
.../query/externalsort/ExternalSortEngineTest.java | 178 ++++++++++++++++++
.../SimpleTimeValuePairSerializerTest.java | 146 +++++++++++++++
.../query/reader/universal/FakedSeriesReader.java | 88 +++++++++
.../reader/universal/PriorityMergeReaderTest.java | 32 ----
.../reader/universal/PriorityMergeReaderTest2.java | 44 +----
.../apache/iotdb/db/utils/EnvironmentUtils.java | 17 +-
27 files changed, 1629 insertions(+), 100 deletions(-)
diff --git a/server/iotdb/conf/iotdb-engine.properties b/server/iotdb/conf/iotdb-engine.properties
index 01dea2c..c939dfd 100644
--- a/server/iotdb/conf/iotdb-engine.properties
+++ b/server/iotdb/conf/iotdb-engine.properties
@@ -186,6 +186,13 @@ stat_monitor_retain_interval_in_second=600
schema_manager_cache_size=300000
####################
+### External sort Configuration
+####################
+# The threshold of items in external sort
+external_sort_threshold = 60
+
+
+####################
### Sync Server Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 86c3622..5c6bdf1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -99,11 +99,16 @@ public class IoTDBConfig {
private String systemDir = "data/system";
/**
- * Schema directory, including storage set of values.
+ * Schema directory, including storage set of values.
*/
private String schemaDir = "data/system/schema";
/**
+ * Query directory, stores temporary files for query
+ */
+ private String queryDir = "data/query";
+
+ /**
* Data directory of data. It can be settled as dataDirs = {"data1", "data2", "data3"};
*/
private String[] dataDirs = {"data/data"};
@@ -181,6 +186,12 @@ public class IoTDBConfig {
private int mManagerCacheSize = 400000;
/**
+ * The threshold of items in external sort. If the number of chunks participating in sorting
+ * exceeds this threshold, external sorting is enabled, otherwise memory sorting is used.
+ */
+ private int externalSortThreshold = 60;
+
+ /**
* Is this IoTDB instance a receiver of sync or not.
*/
private boolean isSyncEnable = true;
@@ -253,6 +264,7 @@ public class IoTDBConfig {
dirs.add(schemaDir);
dirs.add(walFolder);
dirs.add(indexFileDir);
+ dirs.add(queryDir);
dirs.addAll(Arrays.asList(dataDirs));
String homeDir = System.getProperty(IoTDBConstant.IOTDB_HOME, null);
@@ -272,8 +284,9 @@ public class IoTDBConfig {
schemaDir = dirs.get(2);
walFolder = dirs.get(3);
indexFileDir = dirs.get(4);
+ queryDir = dirs.get(5);
for (int i = 0; i < dataDirs.length; i++) {
- dataDirs[i] = dirs.get(i + 5);
+ dataDirs[i] = dirs.get(i + 6);
}
}
@@ -363,6 +376,14 @@ public class IoTDBConfig {
this.schemaDir = schemaDir;
}
+ public String getQueryDir() {
+ return queryDir;
+ }
+
+ public void setQueryDir(String queryDir) {
+ this.queryDir = queryDir;
+ }
+
public String getWalFolder() {
return walFolder;
}
@@ -587,6 +608,14 @@ public class IoTDBConfig {
this.allocateMemoryForRead = allocateMemoryForRead;
}
+ public int getExternalSortThreshold() {
+ return externalSortThreshold;
+ }
+
+ public void setExternalSortThreshold(int externalSortThreshold) {
+ this.externalSortThreshold = externalSortThreshold;
+ }
+
public boolean isEnablePerformanceStat() {
return enablePerformanceStat;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c916114..3e732cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -141,6 +141,8 @@ public class IoTDBDescriptor {
conf.setSchemaDir(FilePathUtils.regularizePath(conf.getSystemDir()) + "schema");
+ conf.setQueryDir(FilePathUtils.regularizePath(conf.getBaseDir()) + "query");
+
conf.setDataDirs(properties.getProperty("data_dirs", conf.getDataDirs()[0])
.split(","));
@@ -224,6 +226,10 @@ public class IoTDBDescriptor {
conf.setZoneID(ZoneId.of(tmpTimeZone.trim()));
logger.info("Time zone has been set to {}", conf.getZoneID());
+ conf.setExternalSortThreshold(Integer.parseInt(properties
+ .getProperty("external_sort_threshold",
+ Integer.toString(conf.getExternalSortThreshold()))));
+
conf.setEnablePerformanceStat(Boolean
.parseBoolean(properties.getProperty("enable_performance_stat",
Boolean.toString(conf.isEnablePerformanceStat())).trim()));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index be81daf..3f3b818 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.control;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -29,6 +30,8 @@ import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairDeserializer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
@@ -37,14 +40,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
/**
* <p>
- * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to the jobs.
- * During the life cycle of a query, the following methods must be called in strict order:
- * 1. assignJobId - get an Id for the new job.
- * 2. beginQueryOfGivenQueryPaths - remind StorageEngine that some files are being used
- * 3. (if using filter)beginQueryOfGivenExpression
- * - remind StorageEngine that some files are being used
- * 4. getQueryDataSource - open files for the job or reuse existing readers.
- * 5. endQueryForGivenJob - putBack the resource used by this job.
+ * QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to
+ * the jobs. During the life cycle of a query, the following methods must be called in strict order:
+ * 1. assignJobId - get an Id for the new job. 2. beginQueryOfGivenQueryPaths - remind StorageEngine
+ * that some files are being used 3. (if using filter)beginQueryOfGivenExpression - remind
+ * StorageEngine that some files are being used 4. getQueryDataSource - open files for the job or
+ * reuse existing readers. 5. endQueryForGivenJob - putBack the resource used by this job.
* </p>
*/
public class QueryResourceManager {
@@ -75,7 +76,8 @@ public class QueryResourceManager {
* returns result token `3` and `4` .
*
* <code>StorageEngine.getInstance().endQueryForGivenJob(device_1, 1)</code> and
- * <code>StorageEngine.getInstance().endQueryForGivenJob(device_2, 2)</code> must be invoked no matter how
+ * <code>StorageEngine.getInstance().endQueryForGivenJob(device_2, 2)</code> must be invoked no
+ * matter how
* query process Q1 exits normally or abnormally. So is Q2,
* <code>StorageEngine.getInstance().endQueryForGivenJob(device_1, 3)</code> and
* <code>StorageEngine.getInstance().endQueryForGivenJob(device_2, 4)</code> must be invoked
@@ -88,10 +90,16 @@ public class QueryResourceManager {
private ConcurrentHashMap<Long, ConcurrentHashMap<String, List<Integer>>> queryTokensMap;
private JobFileManager filePathsManager;
private AtomicLong maxJobId;
+ /**
+ *
+ */
+ private ConcurrentHashMap<Long, List<TimeValuePairDeserializer>> externalSortFileMap;
+
private QueryResourceManager() {
queryTokensMap = new ConcurrentHashMap<>();
filePathsManager = new JobFileManager();
maxJobId = new AtomicLong(0);
+ externalSortFileMap = new ConcurrentHashMap<>();
}
public static QueryResourceManager getInstance() {
@@ -99,8 +107,8 @@ public class QueryResourceManager {
}
/**
- * Assign a jobId for a new query job. When a query request is created firstly, this method
- * must be invoked.
+ * Assign a jobId for a new query job. When a query request is created firstly, this method must
+ * be invoked.
*/
public long assignJobId() {
long jobId = maxJobId.incrementAndGet();
@@ -141,8 +149,9 @@ public class QueryResourceManager {
/**
* Begin query and set query tokens of all filter paths in expression. This method is used in
* filter calculation.
- * @param remoteDeviceIdSet device id set which can not handle locally
- * Note : the method is for cluster
+ *
+ * @param remoteDeviceIdSet device id set which can not handle locally Note : the method is for
+ * cluster
*/
public void beginQueryOfGivenExpression(long jobId, IExpression expression,
Set<String> remoteDeviceIdSet) throws StorageEngineException {
@@ -155,6 +164,16 @@ public class QueryResourceManager {
}
}
+ /**
+ * register temporary file generated by external sort for resource release.
+ *
+ * @param jobId query job id
+ * @param deserializer deserializer of temporary file in external sort.
+ */
+ public void registerTempExternalSortFile(long jobId, TimeValuePairDeserializer deserializer) {
+ externalSortFileMap.computeIfAbsent(jobId, x -> new ArrayList<>()).add(deserializer);
+ }
+
public QueryDataSource getQueryDataSource(Path selectedPath,
QueryContext context) throws StorageEngineException {
@@ -174,6 +193,18 @@ public class QueryResourceManager {
* query tokens created by this jdbc request must be cleared.
*/
public void endQueryForGivenJob(long jobId) throws StorageEngineException {
+ // close file stream of external sort files, and delete
+ if (externalSortFileMap.get(jobId) != null) {
+ for (TimeValuePairDeserializer deserializer : externalSortFileMap.get(jobId)) {
+ try {
+ deserializer.close();
+ } catch (IOException e) {
+ throw new StorageEngineException(e);
+ }
+ }
+ externalSortFileMap.remove(jobId);
+ }
+
if (queryTokensMap.get(jobId) == null) {
// no resource need to be released.
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJob.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJob.java
new file mode 100644
index 0000000..f6c9761
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJob.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.query.reader.IPointReader;
+
+/**
+ * This class represents an external sort job. Every job will use a separated directory.
+ */
+public class ExternalSortJob {
+
+ private long jobId;
+ private List<ExternalSortJobPart> partList;
+
+ public ExternalSortJob(long jobId, List<ExternalSortJobPart> partList) {
+ this.jobId = jobId;
+ this.partList = partList;
+ }
+
+ public List<IPointReader> executeWithGlobalTimeFilter() throws IOException {
+ List<IPointReader> readers = new ArrayList<>();
+ for (ExternalSortJobPart part : partList) {
+ readers.add(part.executeWithGlobalTimeFilter());
+ }
+ return readers;
+ }
+
+ public long getJobId() {
+ return jobId;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
new file mode 100644
index 0000000..f9f8b11
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.query.reader.IPointReader;
+
+
+public interface ExternalSortJobEngine {
+
+ /**
+ * Receive a list of TimeValuePairReaders and judge whether it should be processed using external
+ * sort. If needed, do the merge sort for all TimeValuePairReaders using specific strategy.
+ *
+ * @param timeValuePairReaderList A list include a set of TimeValuePairReaders
+ */
+ List<IPointReader> executeWithGlobalTimeFilter(long queryId, List<IPointReader>
+ timeValuePairReaderList, int startPriority) throws
+ IOException;
+
+ /**
+ * Create an external sort job which contains many parts.
+ */
+ ExternalSortJob createJob(long queryId, List<IPointReader> timeValuePairReaderList,
+ int startPriority)
+ throws IOException;
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java
new file mode 100644
index 0000000..a783aaa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+public abstract class ExternalSortJobPart {
+
+ private ExternalSortJobPartType type;
+
+ public ExternalSortJobPart(ExternalSortJobPartType type) {
+ this.type = type;
+ }
+
+ public abstract PriorityMergeReader executeWithGlobalTimeFilter() throws IOException;
+
+ public ExternalSortJobPartType getType() {
+ return type;
+ }
+
+ public enum ExternalSortJobPartType {
+ SINGLE_SOURCE, MULTIPLE_SOURCE
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobScheduler.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobScheduler.java
new file mode 100644
index 0000000..20d16bb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobScheduler.java
@@ -0,0 +1,44 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+
+ public class ExternalSortJobScheduler {
+
+ private long jobId = 0;
+
+ private ExternalSortJobScheduler() {
+
+ }
+
+ public synchronized long genJobId() {
+ jobId++;
+ return jobId;
+ }
+
+ private static class ExternalSortJobSchedulerHelper {
+
+ private static ExternalSortJobScheduler INSTANCE = new ExternalSortJobScheduler();
+ }
+
+ public static ExternalSortJobScheduler getInstance() {
+ return ExternalSortJobSchedulerHelper.INSTANCE;
+ }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java
new file mode 100644
index 0000000..cf5d1d6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java
@@ -0,0 +1,55 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+ import java.io.IOException;
+ import java.util.List;
+ import org.apache.iotdb.db.query.control.QueryResourceManager;
+ import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+ import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
+ import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairDeserializer;
+ import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairSerializer;
+ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+ public class LineMerger {
+
+ private String tmpFilePath;
+ private long queryId;
+
+ public LineMerger(long queryId, String tmpFilePath) {
+ this.tmpFilePath = tmpFilePath;
+ this.queryId = queryId;
+ }
+
+ public PriorityMergeReader merge(List<PriorityMergeReader> prioritySeriesReaders)
+ throws IOException {
+ TimeValuePairSerializer serializer = new FixLengthTimeValuePairSerializer(tmpFilePath);
+ PriorityMergeReader reader = new PriorityMergeReader(prioritySeriesReaders);
+ while (reader.hasNext()) {
+ serializer.write(reader.next());
+ }
+ reader.close();
+ serializer.close();
+ TimeValuePairDeserializer deserializer = new FixLengthTimeValuePairDeserializer(tmpFilePath);
+ QueryResourceManager.getInstance().registerTempExternalSortFile(queryId, deserializer);
+ return new PriorityMergeReader(deserializer, prioritySeriesReaders.get(0).getPriority());
+ }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
new file mode 100644
index 0000000..76bbfbf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
@@ -0,0 +1,61 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+ public class MultiSourceExternalSortJobPart extends ExternalSortJobPart {
+
+ private String tmpFilePath;
+ private List<ExternalSortJobPart> source;
+ private long queryId;
+
+ public MultiSourceExternalSortJobPart(long queryId, String tmpFilePath,
+ List<ExternalSortJobPart> source) {
+ super(ExternalSortJobPartType.MULTIPLE_SOURCE);
+ this.source = source;
+ this.tmpFilePath = tmpFilePath;
+ this.queryId = queryId;
+ }
+
+ public MultiSourceExternalSortJobPart(long queryId, String tmpFilePath,
+ ExternalSortJobPart... externalSortJobParts) {
+ super(ExternalSortJobPartType.MULTIPLE_SOURCE);
+ source = new ArrayList<>();
+ for (ExternalSortJobPart externalSortJobPart : externalSortJobParts) {
+ source.add(externalSortJobPart);
+ }
+ this.tmpFilePath = tmpFilePath;
+ }
+
+ @Override
+ public PriorityMergeReader executeWithGlobalTimeFilter() throws IOException {
+ List<PriorityMergeReader> prioritySeriesReaders = new ArrayList<>();
+ for (ExternalSortJobPart part : source) {
+ prioritySeriesReaders.add(part.executeWithGlobalTimeFilter());
+ }
+ LineMerger merger = new LineMerger(queryId, tmpFilePath);
+ return merger.merge(prioritySeriesReaders);
+ }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
new file mode 100644
index 0000000..ab94fc2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
@@ -0,0 +1,113 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ import org.apache.commons.io.FileUtils;
+ import org.apache.iotdb.db.conf.IoTDBDescriptor;
+ import org.apache.iotdb.db.exception.StorageEngineFailureException;
+ import org.apache.iotdb.db.query.reader.IPointReader;
+ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+ public class SimpleExternalSortEngine implements ExternalSortJobEngine {
+
+ private ExternalSortJobScheduler scheduler;
+
+ private String queryDir;
+ private int minExternalSortSourceCount;
+
+ private SimpleExternalSortEngine() {
+ queryDir = IoTDBDescriptor.getInstance().getConfig().getQueryDir();
+ minExternalSortSourceCount = IoTDBDescriptor.getInstance().getConfig()
+ .getExternalSortThreshold();
+ scheduler = ExternalSortJobScheduler.getInstance();
+
+ // create queryDir
+ try {
+ FileUtils.forceMkdir(new File(queryDir));
+ } catch (IOException e) {
+ throw new StorageEngineFailureException("create system directory failed!");
+ }
+ }
+
+ // This class is used in test.
+ public SimpleExternalSortEngine(String queryDir, int minExternalSortSourceCount) {
+ this.queryDir = queryDir;
+ this.minExternalSortSourceCount = minExternalSortSourceCount;
+ scheduler = ExternalSortJobScheduler.getInstance();
+ }
+
+ @Override
+ public List<IPointReader> executeWithGlobalTimeFilter(long queryId, List<IPointReader> readers,
+ int startPriority)
+ throws IOException {
+ if (readers.size() < minExternalSortSourceCount) {
+ return readers;
+ }
+ ExternalSortJob job = createJob(queryId, readers, startPriority);
+ return job.executeWithGlobalTimeFilter();
+ }
+
+ //TODO: this method could be optimized to have a better performance
+ @Override
+ public ExternalSortJob createJob(long queryId, List<IPointReader> readers, int startPriority)
+ throws IOException {
+ long jodId = scheduler.genJobId();
+ List<ExternalSortJobPart> ret = new ArrayList<>();
+ List<ExternalSortJobPart> tmpPartList = new ArrayList<>();
+ for (IPointReader reader : readers) {
+ ret.add(
+ new SingleSourceExternalSortJobPart(new PriorityMergeReader(reader, startPriority++)));
+ }
+
+ int partId = 0;
+ while (ret.size() >= minExternalSortSourceCount) {
+ for (int i = 0; i < ret.size(); ) {
+ List<ExternalSortJobPart> partGroup = new ArrayList<>();
+ for (int j = 0; j < minExternalSortSourceCount && i < ret.size(); j++) {
+ partGroup.add(ret.get(i));
+ i++;
+ }
+ StringBuilder tmpFilePath = new StringBuilder(queryDir).append(jodId).append("_")
+ .append(partId);
+ MultiSourceExternalSortJobPart part = new MultiSourceExternalSortJobPart(queryId,
+ tmpFilePath.toString(), partGroup);
+ tmpPartList.add(part);
+ partId++;
+ }
+ ret = tmpPartList;
+ tmpPartList = new ArrayList<>();
+ }
+ return new ExternalSortJob(jodId, ret);
+ }
+
+ private static class SimpleExternalSortJobEngineHelper {
+
+ private static SimpleExternalSortEngine INSTANCE = new SimpleExternalSortEngine();
+ }
+
+ public static SimpleExternalSortEngine getInstance() {
+ return SimpleExternalSortJobEngineHelper.INSTANCE;
+ }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java
new file mode 100644
index 0000000..fe278b1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java
@@ -0,0 +1,38 @@
+ /**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.iotdb.db.query.externalsort;
+
+ import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+
+
+ public class SingleSourceExternalSortJobPart extends ExternalSortJobPart {
+
+ private PriorityMergeReader timeValuePairReader;
+
+ public SingleSourceExternalSortJobPart(PriorityMergeReader timeValuePairReader) {
+ super(ExternalSortJobPartType.SINGLE_SOURCE);
+ this.timeValuePairReader = timeValuePairReader;
+ }
+
+ @Override
+ public PriorityMergeReader executeWithGlobalTimeFilter() {
+ return this.timeValuePairReader;
+ }
+ }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java
new file mode 100644
index 0000000..ab11325
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+public interface TimeValuePairDeserializer extends IPointReader {
+
+ @Override
+ default TimeValuePair current() throws IOException {
+ throw new IOException("TimeValuePairDeserializer doesn't implement current() method.");
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairSerializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairSerializer.java
new file mode 100644
index 0000000..f9dd888
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairSerializer.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize;
+
+import java.io.IOException;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+public interface TimeValuePairSerializer {
+
+ void write(TimeValuePair timeValuePair) throws IOException;
+
+ void close() throws IOException;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java
new file mode 100644
index 0000000..8abd7e1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java
@@ -0,0 +1,208 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize.impl;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+
+/**
+ * FileFormat: [Header][Body] [Header] = [DataTypeLength] + [DataTypeInStringBytes] [DataTypeLength]
+ * = 4 bytes.
+ */
+public class FixLengthTimeValuePairDeserializer implements TimeValuePairDeserializer {
+
+ private TimeValuePairReader reader;
+ private InputStream inputStream;
+ private String tmpFilePath;
+
+ public FixLengthTimeValuePairDeserializer(String tmpFilePath) throws IOException {
+ this.tmpFilePath = tmpFilePath;
+ inputStream = new BufferedInputStream(new FileInputStream(tmpFilePath));
+ TSDataType dataType = readHeader();
+ setReader(dataType);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return inputStream.available() > 0;
+ }
+
+ @Override
+ public TimeValuePair next() throws IOException {
+ return reader.read(inputStream);
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ File file = new File(tmpFilePath);
+ if (!file.exists()) {
+ return;
+ }
+ if (!file.delete()) {
+ throw new IOException("Delete external sort tmp file error. FilePath:" + tmpFilePath);
+ }
+ }
+
+ public String getTmpFilePath() {
+ return tmpFilePath;
+ }
+
+ private TSDataType readHeader() throws IOException {
+ byte[] lengthInBytes = new byte[4];
+ inputStream.read(lengthInBytes);
+ int length = BytesUtils.bytesToInt(lengthInBytes);
+ byte[] typeInBytes = new byte[length];
+ inputStream.read(typeInBytes);
+ return TSDataType.valueOf(BytesUtils.bytesToString(typeInBytes));
+ }
+
+ private void setReader(TSDataType type) {
+ switch (type) {
+ case BOOLEAN:
+ this.reader = new TimeValuePairReader.BooleanReader();
+ break;
+ case INT32:
+ this.reader = new TimeValuePairReader.IntReader();
+ break;
+ case INT64:
+ this.reader = new TimeValuePairReader.LongReader();
+ break;
+ case FLOAT:
+ this.reader = new TimeValuePairReader.FloatReader();
+ break;
+ case DOUBLE:
+ this.reader = new TimeValuePairReader.DoubleReader();
+ break;
+ case TEXT:
+ this.reader = new TimeValuePairReader.BinaryReader();
+ break;
+ default:
+ throw new RuntimeException("Unknown TSDataType in FixLengthTimeValuePairSerializer:"
+ + type);
+ }
+ }
+
+ private abstract static class TimeValuePairReader {
+
+ public abstract TimeValuePair read(InputStream inputStream) throws IOException;
+
+ private static class BooleanReader
+ extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+ byte[] timestampBytes = new byte[8];
+ byte[] valueBytes = new byte[1];
+
+ @Override
+ public TimeValuePair read(InputStream inputStream) throws IOException {
+ inputStream.read(timestampBytes);
+ inputStream.read(valueBytes);
+ return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+ new TsPrimitiveType.TsBoolean(BytesUtils.bytesToBool(valueBytes)));
+ }
+ }
+
+ private static class IntReader extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+ byte[] timestampBytes = new byte[8];
+ byte[] valueBytes = new byte[4];
+
+ @Override
+ public TimeValuePair read(InputStream inputStream) throws IOException {
+ inputStream.read(timestampBytes);
+ inputStream.read(valueBytes);
+ return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+ new TsPrimitiveType.TsInt(BytesUtils.bytesToInt(valueBytes)));
+ }
+ }
+
+ private static class LongReader extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+ byte[] timestampBytes = new byte[8];
+ byte[] valueBytes = new byte[8];
+
+ @Override
+ public TimeValuePair read(InputStream inputStream) throws IOException {
+ inputStream.read(timestampBytes);
+ inputStream.read(valueBytes);
+ return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+ new TsPrimitiveType.TsLong(BytesUtils.bytesToLong(valueBytes)));
+ }
+ }
+
+ private static class FloatReader
+ extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+ byte[] timestampBytes = new byte[8];
+ byte[] valueBytes = new byte[4];
+
+ @Override
+ public TimeValuePair read(InputStream inputStream) throws IOException {
+ inputStream.read(timestampBytes);
+ inputStream.read(valueBytes);
+ return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+ new TsPrimitiveType.TsFloat(BytesUtils.bytesToFloat(valueBytes)));
+ }
+ }
+
+ private static class DoubleReader
+ extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+ byte[] timestampBytes = new byte[8];
+ byte[] valueBytes = new byte[8];
+
+ @Override
+ public TimeValuePair read(InputStream inputStream) throws IOException {
+ inputStream.read(timestampBytes);
+ inputStream.read(valueBytes);
+ return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+ new TsPrimitiveType.TsDouble(BytesUtils.bytesToDouble(valueBytes)));
+ }
+ }
+
+ private static class BinaryReader
+ extends FixLengthTimeValuePairDeserializer.TimeValuePairReader {
+
+ byte[] timestampBytes = new byte[8];
+ byte[] valueLength = new byte[4];
+ byte[] valueBytes;
+
+ @Override
+ public TimeValuePair read(InputStream inputStream) throws IOException {
+ inputStream.read(timestampBytes);
+ inputStream.read(valueLength);
+ int length = BytesUtils.bytesToInt(valueLength);
+ valueBytes = new byte[length];
+ inputStream.read(valueBytes);
+ return new TimeValuePair(BytesUtils.bytesToLong(timestampBytes),
+ new TsPrimitiveType.TsBinary(new Binary(BytesUtils.bytesToString(valueBytes))));
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairSerializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairSerializer.java
new file mode 100644
index 0000000..b253764
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairSerializer.java
@@ -0,0 +1,165 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+
+/**
+ * IMPORTANT: One instance of this class should used with same type of TimeValuePair. FileFormat:
+ * [Header][Body][Header] = [DataTypeLength] + [DataTypeInStringBytes] [DataTypeLength] = 4 bytes
+ */
+public class FixLengthTimeValuePairSerializer implements TimeValuePairSerializer {
+
+ private TimeValuePairWriter writer;
+ private OutputStream outputStream;
+ private boolean dataTypeDefined;
+
+ public FixLengthTimeValuePairSerializer(String tmpFilePath) throws IOException {
+ checkPath(tmpFilePath);
+ outputStream = new BufferedOutputStream(new FileOutputStream(tmpFilePath));
+ }
+
+ @Override
+ public void write(TimeValuePair timeValuePair) throws IOException {
+ if (!dataTypeDefined) {
+ setWriter(timeValuePair.getValue().getDataType());
+ writeHeader(timeValuePair.getValue().getDataType());
+ dataTypeDefined = true;
+ }
+ writer.write(timeValuePair, outputStream);
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream.close();
+ }
+
+ private void writeHeader(TSDataType dataType) throws IOException {
+ String typeInString = dataType.toString();
+ outputStream.write(BytesUtils.intToBytes(typeInString.length()));
+ outputStream.write(BytesUtils.stringToBytes(typeInString));
+ }
+
+ private void checkPath(String tmpFilePath) throws IOException {
+ File file = new File(tmpFilePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ if (file.getParentFile() != null) {
+ file.getParentFile().mkdirs();
+ }
+ file.createNewFile();
+ }
+
+ private void setWriter(TSDataType type) {
+ switch (type) {
+ case BOOLEAN:
+ this.writer = new TimeValuePairWriter.BooleanWriter();
+ break;
+ case INT32:
+ this.writer = new TimeValuePairWriter.IntWriter();
+ break;
+ case INT64:
+ this.writer = new TimeValuePairWriter.LongWriter();
+ break;
+ case FLOAT:
+ this.writer = new TimeValuePairWriter.FloatWriter();
+ break;
+ case DOUBLE:
+ this.writer = new TimeValuePairWriter.DoubleWriter();
+ break;
+ case TEXT:
+ this.writer = new TimeValuePairWriter.BinaryWriter();
+ break;
+ default:
+ throw new RuntimeException("Unknown TSDataType in FixLengthTimeValuePairSerializer:"
+ + type);
+ }
+ }
+
+ private abstract static class TimeValuePairWriter {
+
+ public abstract void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException;
+
+ private static class BooleanWriter extends TimeValuePairWriter {
+
+ @Override
+ public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+ outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+ outputStream.write(BytesUtils.boolToBytes(tvPair.getValue().getBoolean()));
+ }
+ }
+
+ private static class IntWriter extends TimeValuePairWriter {
+
+ @Override
+ public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+ outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+ outputStream.write(BytesUtils.intToBytes(tvPair.getValue().getInt()));
+ }
+ }
+
+ private static class LongWriter extends TimeValuePairWriter {
+
+ @Override
+ public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+ outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+ outputStream.write(BytesUtils.longToBytes(tvPair.getValue().getLong()));
+ }
+ }
+
+ private static class FloatWriter extends TimeValuePairWriter {
+
+ @Override
+ public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+ outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+ outputStream.write(BytesUtils.floatToBytes(tvPair.getValue().getFloat()));
+ }
+ }
+
+ private static class DoubleWriter extends TimeValuePairWriter {
+
+ @Override
+ public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+ outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+ outputStream.write(BytesUtils.doubleToBytes(tvPair.getValue().getDouble()));
+ }
+ }
+
+ private static class BinaryWriter extends TimeValuePairWriter {
+
+ @Override
+ public void write(TimeValuePair tvPair, OutputStream outputStream) throws IOException {
+ outputStream.write(BytesUtils.longToBytes(tvPair.getTimestamp()));
+ outputStream.write(BytesUtils.intToBytes(tvPair.getValue().getBinary().getLength()));
+ outputStream.write(BytesUtils.stringToBytes(tvPair.getValue()
+ .getBinary().getStringValue()));
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java
new file mode 100644
index 0000000..33a1ff7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java
@@ -0,0 +1,76 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize.impl;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+/**
+ * Deserializer TimeValuePair.
+ */
+public class SimpleTimeValuePairDeserializer implements TimeValuePairDeserializer {
+
+ private InputStream inputStream;
+ private ObjectInputStream objectInputStream;
+ private String tmpFilePath;
+
+ /**
+ * init with file path.
+ */
+ public SimpleTimeValuePairDeserializer(String tmpFilePath) throws IOException {
+ inputStream = new BufferedInputStream(new FileInputStream(tmpFilePath));
+ objectInputStream = new ObjectInputStream(inputStream);
+ this.tmpFilePath = tmpFilePath;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return inputStream.available() > 0;
+ }
+
+ @Override
+ public TimeValuePair next() throws IOException {
+ try {
+ return (TimeValuePair) objectInputStream.readUnshared();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * This method will delete.
+ *
+ * @throws IOException -Delete external sort tmp file error.
+ */
+ @Override
+ public void close() throws IOException {
+ objectInputStream.close();
+ File file = new File(tmpFilePath);
+ if (!file.delete()) {
+ throw new IOException("Delete external sort tmp file error. FilePath:" + tmpFilePath);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairSerializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairSerializer.java
new file mode 100644
index 0000000..42fc5c9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairSerializer.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.serialize.impl;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+/**
+ * Serializer timeValuePair.
+ */
+public class SimpleTimeValuePairSerializer implements TimeValuePairSerializer {
+
+ private ObjectOutputStream objectOutputStream;
+
+ /**
+ * init with file path.
+ */
+ public SimpleTimeValuePairSerializer(String tmpFilePath) throws IOException {
+ checkPath(tmpFilePath);
+ objectOutputStream = new ObjectOutputStream(new BufferedOutputStream(
+ new FileOutputStream(tmpFilePath)));
+ }
+
+ private void checkPath(String tmpFilePath) throws IOException {
+ File file = new File(tmpFilePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ if (file.getParentFile() != null) {
+ file.getParentFile().mkdirs();
+ }
+ file.createNewFile();
+ }
+
+ @Override
+ public void write(TimeValuePair timeValuePair) throws IOException {
+ objectOutputStream.writeUnshared(timeValuePair);
+ }
+
+ @Override
+ public void close() throws IOException {
+ objectOutputStream.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
index 0d3eb66..939bb88 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
@@ -19,12 +19,16 @@
package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.externalsort.ExternalSortJobEngine;
+import org.apache.iotdb.db.query.externalsort.SimpleExternalSortEngine;
+import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.chunkRelated.DiskChunkReader;
import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
@@ -54,12 +58,15 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
public class UnseqResourceMergeReader extends PriorityMergeReader {
private Path seriesPath;
+ private long queryId;
public UnseqResourceMergeReader(Path seriesPath, List<TsFileResource> unseqResources,
QueryContext context, Filter filter) throws IOException {
this.seriesPath = seriesPath;
+ this.queryId = context.getJobId();
int priorityValue = 1;
+ List<IPointReader> priorityReaderList = new ArrayList<>();
for (TsFileResource tsFileResource : unseqResources) {
// prepare metaDataList
@@ -109,15 +116,32 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
: new ChunkReaderWithoutFilter(chunk);
- addReaderWithPriority(new DiskChunkReader(chunkReader), priorityValue++);
+ priorityReaderList.add(new DiskChunkReader(chunkReader));
}
if (!tsFileResource.isClosed()) {
// create and add MemChunkReader with priority
- addReaderWithPriority(
- new MemChunkReader(tsFileResource.getReadOnlyMemChunk(), filter), priorityValue++);
+ priorityReaderList.add(new MemChunkReader(tsFileResource.getReadOnlyMemChunk(), filter));
}
}
+
+ if (shouldUseExternalSort()) {
+ ExternalSortJobEngine externalSortJobEngine = SimpleExternalSortEngine.getInstance();
+ List<IPointReader> readerList = externalSortJobEngine
+ .executeWithGlobalTimeFilter(queryId, priorityReaderList, priorityValue);
+ for (IPointReader chunkReader : readerList) {
+ addReaderWithPriority(chunkReader, priorityValue++);
+ }
+ } else {
+ for (IPointReader chunkReader : priorityReaderList) {
+ addReaderWithPriority(chunkReader, priorityValue++);
+ }
+ }
+
+ }
+
+ private boolean shouldUseExternalSort() {
+ return false;
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 106a906..428263e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -34,6 +34,19 @@ public class PriorityMergeReader implements IPointReader {
private List<Integer> priorityList = new ArrayList<>();
private PriorityQueue<Element> heap = new PriorityQueue<>();
+ public PriorityMergeReader() {
+ }
+
+ public PriorityMergeReader(IPointReader reader, int priority) throws IOException {
+ addReaderWithPriority(reader, priority);
+ }
+
+ public PriorityMergeReader(List<PriorityMergeReader> prioritySeriesReaders) throws IOException {
+ for (PriorityMergeReader reader : prioritySeriesReaders) {
+ addReaderWithPriority(reader, reader.getPriority());
+ }
+ }
+
public void addReaderWithPriority(IPointReader reader, int priority) throws IOException {
if (reader.hasNext()) {
heap.add(new Element(readerList.size(), reader.next(), priority));
@@ -77,6 +90,14 @@ public class PriorityMergeReader implements IPointReader {
}
}
+ public int getPriority() {
+ if (priorityList.isEmpty()) {
+ return 0;
+ } else {
+ return priorityList.get(0);
+ }
+ }
+
protected class Element implements Comparable<Element> {
int index;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java
new file mode 100644
index 0000000..226d5ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.reader.universal;
+
+import org.apache.iotdb.db.query.reader.IPointReader;
+
+public class PriorityReaderBean {
+
+ private IPointReader reader;
+ private int priority;
+
+ public PriorityReaderBean(IPointReader reader, int priority) {
+ this.reader = reader;
+ this.priority = priority;
+ }
+
+ public IPointReader getReader() {
+ return reader;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java b/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java
new file mode 100644
index 0000000..33b9d22
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.universal.FakedSeriesReader;
+import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExternalSortEngineTest {
+
+ private String baseDir = "externalSortTestTmp";
+ private long queryId = EnvironmentUtils.TEST_QUERY_JOB_ID;
+
+ @After
+ public void after() throws IOException, StorageEngineException {
+ QueryResourceManager.getInstance().endQueryForGivenJob(queryId);
+ deleteDir();
+ }
+
+ @Test
+ public void testSimple() throws IOException {
+ SimpleExternalSortEngine engine = new SimpleExternalSortEngine(baseDir + "/", 2);
+ List<IPointReader> readerList1 = genSimple();
+ List<IPointReader> readerList2 = genSimple();
+ readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
+ PriorityMergeReader reader1 = new PriorityMergeReader();
+ for (int i = 0; i < readerList1.size(); i++) {
+ reader1.addReaderWithPriority(readerList1.get(i), i);
+ }
+ PriorityMergeReader reader2 = new PriorityMergeReader();
+ for (int i = 0; i < readerList2.size(); i++) {
+ reader2.addReaderWithPriority(readerList2.get(i), i);
+ }
+ check(reader1, reader2);
+ reader1.close();
+ reader2.close();
+ }
+
+ @Test
+ public void testBig() throws IOException {
+ SimpleExternalSortEngine engine = new SimpleExternalSortEngine(baseDir + "/", 50);
+ int lineCount = 100;
+ int valueCount = 10000;
+ List<long[]> data = genData(lineCount, valueCount);
+
+ List<IPointReader> readerList1 = genReaders(data);
+ List<IPointReader> readerList2 = genReaders(data);
+ readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
+ PriorityMergeReader reader1 = new PriorityMergeReader();
+ for (int i = 0; i < readerList1.size(); i++) {
+ reader1.addReaderWithPriority(readerList1.get(i), i);
+ }
+ PriorityMergeReader reader2 = new PriorityMergeReader();
+ for (int i = 0; i < readerList2.size(); i++) {
+ reader2.addReaderWithPriority(readerList2.get(i), i);
+ }
+
+ check(reader1, reader2);
+ reader1.close();
+ reader2.close();
+ }
+
+ public void efficiencyTest() throws IOException {
+ SimpleExternalSortEngine engine = new SimpleExternalSortEngine(baseDir + "/", 50);
+ int lineCount = 100000;
+ int valueCount = 100;
+ List<long[]> data = genData(lineCount, valueCount);
+
+ List<IPointReader> readerList1 = genReaders(data);
+ long startTimestamp = System.currentTimeMillis();
+ readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
+ PriorityMergeReader reader1 = new PriorityMergeReader();
+ for (int i = 0; i < readerList1.size(); i++) {
+ reader1.addReaderWithPriority(readerList1.get(i), i);
+ }
+ while (reader1.hasNext()) {
+ reader1.next();
+ }
+ System.out.println(
+ "Time used WITH external sort:" + (System.currentTimeMillis() - startTimestamp) + "ms");
+
+ List<IPointReader> readerList2 = genReaders(data);
+ startTimestamp = System.currentTimeMillis();
+ PriorityMergeReader reader2 = new PriorityMergeReader();
+ for (int i = 0; i < readerList2.size(); i++) {
+ reader2.addReaderWithPriority(readerList2.get(i), i);
+ }
+ while (reader2.hasNext()) {
+ reader2.next();
+ }
+ System.out.println(
+ "Time used WITHOUT external sort:" + (System.currentTimeMillis() - startTimestamp) + "ms");
+
+ //reader1.close();
+ reader2.close();
+ }
+
+ private List<long[]> genData(int lineCount, int valueCountEachLine) {
+ Random rand = new Random();
+ List<long[]> data = new ArrayList<>();
+ for (int i = 0; i < lineCount; i++) {
+ long[] tmp = new long[valueCountEachLine];
+ long start = rand.nextInt(Integer.MAX_VALUE);
+ for (int j = 0; j < valueCountEachLine; j++) {
+ tmp[j] = start++;
+ }
+ data.add(tmp);
+ }
+ return data;
+ }
+
+ private List<IPointReader> genReaders(List<long[]> data) {
+ List<IPointReader> readerList = new ArrayList<>();
+ for (int i = 0; i < data.size(); i++) {
+ readerList.add(new FakedSeriesReader(data.get(i), i));
+ }
+ return readerList;
+ }
+
+ private void check(IPointReader reader1, IPointReader reader2) throws IOException {
+ while (reader1.hasNext() && reader2.hasNext()) {
+ TimeValuePair tv1 = reader1.next();
+ TimeValuePair tv2 = reader2.next();
+ Assert.assertEquals(tv1.getTimestamp(), tv2.getTimestamp());
+ Assert.assertEquals(tv1.getValue(), tv2.getValue());
+ }
+ Assert.assertEquals(false, reader2.hasNext());
+ Assert.assertEquals(false, reader1.hasNext());
+ }
+
+ private List<IPointReader> genSimple() {
+ IPointReader reader1 = new FakedSeriesReader(new long[]{1, 2, 3, 4, 5}, 1L);
+ IPointReader reader2 = new FakedSeriesReader(new long[]{1, 5, 6, 7, 8}, 2L);
+ IPointReader reader3 = new FakedSeriesReader(new long[]{4, 5, 6, 7, 10}, 3L);
+
+ List<IPointReader> readerList = new ArrayList<>();
+ readerList.add(reader1);
+ readerList.add(reader2);
+ readerList.add(reader3);
+ return readerList;
+ }
+
+ private void deleteDir() throws IOException {
+ File file = new File(baseDir);
+ if (!file.delete()) {
+ throw new IOException("delete tmp file dir error");
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/externalsort/SimpleTimeValuePairSerializerTest.java b/server/src/test/java/org/apache/iotdb/db/query/externalsort/SimpleTimeValuePairSerializerTest.java
new file mode 100644
index 0000000..4c14e7b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/externalsort/SimpleTimeValuePairSerializerTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.externalsort;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
+import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairDeserializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairSerializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.SimpleTimeValuePairDeserializer;
+import org.apache.iotdb.db.query.externalsort.serialize.impl.SimpleTimeValuePairSerializer;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Created by zhangjinrui on 2018/1/20.
+ */
+public class SimpleTimeValuePairSerializerTest {
+
+ private enum Type {
+ SIMPLE, FIX_LENGTH
+ }
+
+ @Test
+ public void testSIMPLE() throws IOException, ClassNotFoundException {
+ String rootPath = "d1";
+ String filePath = rootPath + "/d2/d3/tmpFile1";
+ int count = 10000;
+ testReadWrite(genTimeValuePairs(count), count, rootPath, filePath, Type.SIMPLE);
+ }
+
+ @Test
+ public void testFIX_LENGTH() throws IOException, ClassNotFoundException {
+ String rootPath = "tmpFile2";
+ String filePath = rootPath;
+ int count = 10000;
+ testReadWrite(genTimeValuePairs(count, TSDataType.BOOLEAN), count, rootPath, filePath, Type.FIX_LENGTH);
+ testReadWrite(genTimeValuePairs(count, TSDataType.INT32), count, rootPath, filePath, Type.FIX_LENGTH);
+ testReadWrite(genTimeValuePairs(count, TSDataType.INT64), count, rootPath, filePath, Type.FIX_LENGTH);
+ testReadWrite(genTimeValuePairs(count, TSDataType.FLOAT), count, rootPath, filePath, Type.FIX_LENGTH);
+ testReadWrite(genTimeValuePairs(count, TSDataType.DOUBLE), count, rootPath, filePath, Type.FIX_LENGTH);
+ testReadWrite(genTimeValuePairs(count, TSDataType.TEXT), count, rootPath, filePath, Type.FIX_LENGTH);
+ }
+
+ private void testReadWrite(TimeValuePair[] timeValuePairs, int count, String rootPath, String filePath, Type type) throws IOException, ClassNotFoundException {
+ TimeValuePairSerializer serializer = null;
+ if (type == Type.SIMPLE) {
+ serializer = new SimpleTimeValuePairSerializer(filePath);
+ } else if (type == Type.FIX_LENGTH) {
+ serializer = new FixLengthTimeValuePairSerializer(filePath);
+ }
+
+ for (TimeValuePair timeValuePair : timeValuePairs) {
+ serializer.write(timeValuePair);
+ }
+ serializer.close();
+
+ TimeValuePairDeserializer deserializer = null;
+ if (type == Type.SIMPLE) {
+ deserializer = new SimpleTimeValuePairDeserializer(filePath);
+ } else if (type == Type.FIX_LENGTH) {
+ deserializer = new FixLengthTimeValuePairDeserializer(filePath);
+ }
+
+ int idx = 0;
+ while (deserializer.hasNext()) {
+ TimeValuePair timeValuePair = deserializer.next();
+ Assert.assertEquals(timeValuePairs[idx].getValue(), timeValuePair.getValue());
+ Assert.assertEquals(timeValuePairs[idx].getTimestamp(), timeValuePair.getTimestamp());
+ idx++;
+ }
+ Assert.assertEquals(count, idx);
+ deserializer.close();
+ deleteFileRecursively(new File(rootPath));
+ }
+
+ private void deleteFileRecursively(File file) throws IOException {
+ if (!file.exists()) {
+ return;
+ }
+ if (file.isDirectory()) {
+ for (File f : file.listFiles()) {
+ deleteFileRecursively(f);
+ }
+ }
+ if (!file.delete())
+ throw new IOException("Failed to delete file: " + file);
+ }
+
+ private TimeValuePair[] genTimeValuePairs(int count) {
+ TimeValuePair[] timeValuePairs = new TimeValuePair[count];
+ for (int i = 0; i < count; i++) {
+ timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsInt(i));
+ }
+ return timeValuePairs;
+ }
+
+ private TimeValuePair[] genTimeValuePairs(int count, TSDataType dataType) {
+ TimeValuePair[] timeValuePairs = new TimeValuePair[count];
+ for (int i = 0; i < count; i++) {
+ switch (dataType) {
+ case BOOLEAN:
+ timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsBoolean(i % 2 == 0 ? true : false));
+ break;
+ case INT32:
+ timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsInt(i));
+ break;
+ case INT64:
+ timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsLong(i));
+ break;
+ case FLOAT:
+ timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsFloat(i + 0.1f));
+ break;
+ case DOUBLE:
+ timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsDouble(i + 0.12));
+ break;
+ case TEXT:
+ timeValuePairs[i] = new TimeValuePair(i, new TsPrimitiveType.TsBinary(new Binary(String.valueOf(i))));
+ break;
+ }
+ }
+ return timeValuePairs;
+ }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/FakedSeriesReader.java b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/FakedSeriesReader.java
new file mode 100644
index 0000000..4c3cd1d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/FakedSeriesReader.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.universal;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TsPrimitiveType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class FakedSeriesReader implements IPointReader {
+
+ private int index;
+ private int size;
+ private boolean initWithTimeList;
+ private final static TSDataType DATA_TYPE = TSDataType.INT64;
+
+ // init with time list and value
+ private long[] timestamps;
+ private long value;
+
+
+ // init with startTime, size, interval and modValue
+ private long startTime;
+ private int interval;
+ private int modValue;
+
+ public FakedSeriesReader(long[] timestamps, long value) {
+ this.initWithTimeList = true;
+ this.index = 0;
+ this.size = timestamps.length;
+ this.timestamps = timestamps;
+ this.value = value;
+ }
+
+ public FakedSeriesReader(long startTime, int size, int interval, int modValue) {
+ this.initWithTimeList = false;
+ this.index = 0;
+ this.size = size;
+ this.startTime = startTime;
+ this.interval = interval;
+ this.modValue = modValue;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < size;
+ }
+
+ @Override
+ public TimeValuePair next() {
+ if (initWithTimeList) {
+ return new TimeValuePair(timestamps[index++], TsPrimitiveType.getByType(DATA_TYPE, value));
+ } else {
+ long time = startTime;
+ startTime += interval;
+ index++;
+ return new TimeValuePair(time,
+ TsPrimitiveType.getByType(TSDataType.INT64, time % modValue));
+ }
+ }
+
+ @Override
+ public TimeValuePair current() throws IOException {
+ throw new IOException("current() in FakedPrioritySeriesReader is an empty method.");
+ }
+
+ @Override
+ public void close() {
+ }
+
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java
index 6461cca..656ae94 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java
@@ -65,36 +65,4 @@ public class PriorityMergeReaderTest {
i++;
}
}
-
- public static class FakedSeriesReader implements IPointReader {
-
- private long[] timestamps;
- private int index;
- private long value;
-
- FakedSeriesReader(long[] timestamps, long value) {
- this.timestamps = timestamps;
- index = 0;
- this.value = value;
- }
-
- @Override
- public boolean hasNext() {
- return index < timestamps.length;
- }
-
- @Override
- public TimeValuePair next() {
- return new TimeValuePair(timestamps[index++], new TsPrimitiveType.TsLong(value));
- }
-
- @Override
- public TimeValuePair current() {
- return new TimeValuePair(timestamps[index], new TsPrimitiveType.TsLong(value));
- }
-
- @Override
- public void close() {
- }
- }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
index afeed42..f2cc207 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
@@ -34,9 +34,9 @@ public class PriorityMergeReaderTest2 {
@Test
public void test() throws IOException {
- FakedPrioritySeriesReader reader1 = new FakedPrioritySeriesReader(100, 80, 5, 11);
- FakedPrioritySeriesReader reader2 = new FakedPrioritySeriesReader(150, 60, 6, 19);
- FakedPrioritySeriesReader reader3 = new FakedPrioritySeriesReader(180, 50, 7, 31);
+ FakedSeriesReader reader1 = new FakedSeriesReader(100, 80, 5, 11);
+ FakedSeriesReader reader2 = new FakedSeriesReader(150, 60, 6, 19);
+ FakedSeriesReader reader3 = new FakedSeriesReader(180, 50, 7, 31);
PriorityMergeReader priorityMergeReader = new PriorityMergeReader();
priorityMergeReader.addReaderWithPriority(reader1, 3);
@@ -61,42 +61,4 @@ public class PriorityMergeReaderTest2 {
}
Assert.assertEquals(162, cnt);
}
-
- public static class FakedPrioritySeriesReader implements IPointReader {
-
- private Iterator<TimeValuePair> iterator;
-
- FakedPrioritySeriesReader(long startTime, int size, int interval, int modValue) {
- long time = startTime;
- List<TimeValuePair> list = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- list.add(
- new TimeValuePair(time,
- TsPrimitiveType.getByType(TSDataType.INT64, time % modValue)));
- // System.out.println(time + "," + time % modValue);
- time += interval;
- }
- iterator = list.iterator();
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public TimeValuePair next() {
- return iterator.next();
- }
-
- @Override
- public TimeValuePair current() throws IOException {
- throw new IOException("current() in FakedPrioritySeriesReader is an empty method.");
- }
-
-
- @Override
- public void close() {
- }
- }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 3a6a255..53f42f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -26,22 +26,20 @@ import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.adapter.CompressionRatio;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
import org.apache.iotdb.db.engine.flush.FlushManager;
-import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +50,6 @@ import org.slf4j.LoggerFactory;
* </p>
*
* @author liukun
- *
*/
public class EnvironmentUtils {
@@ -117,6 +114,8 @@ public class EnvironmentUtils {
cleanDir(config.getWalFolder());
// delete index
cleanDir(config.getIndexFileDir());
+ // delete query
+ cleanDir(config.getQueryDir());
cleanDir(config.getBaseDir());
// delete data files
for (String dataDir : config.getDataDirs()) {
@@ -129,16 +128,14 @@ public class EnvironmentUtils {
}
/**
- * disable the system monitor</br>
- * this function should be called before all code in the setup
+ * disable the system monitor</br> this function should be called before all code in the setup
*/
public static void closeStatMonitor() {
config.setEnableStatMonitor(false);
}
/**
- * disable memory control</br>
- * this function should be called before all code in the setup
+ * disable memory control</br> this function should be called before all code in the setup
*/
public static void envSetUp() throws StartupException, IOException {
IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(false);
@@ -181,8 +178,10 @@ public class EnvironmentUtils {
createDir(config.getWalFolder());
// create index
createDir(config.getIndexFileDir());
+ // create query
+ createDir(config.getQueryDir());
// create data
- for (String dataDir: config.getDataDirs()) {
+ for (String dataDir : config.getDataDirs()) {
createDir(dataDir);
}
}