You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2019/08/13 02:11:15 UTC
[incubator-iotdb] branch external_sort updated: ReDesign External
Sort
This is an automated email from the ASF dual-hosted git repository.
suyue pushed a commit to branch external_sort
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/external_sort by this push:
new d8939e1 ReDesign External Sort
d8939e1 is described below
commit d8939e1d99ed8dabb6918e4b5c3177c817f965d0
Author: suyue <23...@qq.com>
AuthorDate: Tue Aug 13 10:10:41 2019 +0800
ReDesign External Sort
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../db/query/control/QueryResourceManager.java | 20 +++--
.../query/externalsort/ExternalSortJobEngine.java | 27 ++++--
.../db/query/externalsort/ExternalSortJobPart.java | 4 +-
.../iotdb/db/query/externalsort/LineMerger.java | 7 +-
.../MultiSourceExternalSortJobPart.java | 6 +-
.../externalsort/SimpleExternalSortEngine.java | 73 +++++++++++++---
.../SingleSourceExternalSortJobPart.java | 24 +++---
.../adapter/ByTimestampReaderAdapter.java | 78 +++++++++++++++++
.../serialize/TimeValuePairDeserializer.java | 3 +-
.../impl/FixLengthTimeValuePairDeserializer.java | 5 ++
.../impl/SimpleTimeValuePairDeserializer.java | 10 ++-
.../query/reader/chunkRelated/ChunkReaderWrap.java | 98 ++++++++++++++++++++++
.../resourceRelated/UnseqResourceMergeReader.java | 45 +++-------
.../UnseqResourceReaderByTimestamp.java | 39 ++++-----
.../reader/universal/PriorityMergeReader.java | 10 +--
.../query/externalsort/ExternalSortEngineTest.java | 34 ++++----
.../query/externalsort/FakeChunkReaderWrap.java} | 29 +++----
.../reader/universal/PriorityMergeReaderTest2.java | 6 --
19 files changed, 372 insertions(+), 148 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5c6bdf1..7161072 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -104,7 +104,7 @@ public class IoTDBConfig {
private String schemaDir = "data/system/schema";
/**
- * Query directory, stores temporary files for query
+ * Query directory, stores temporary files of query
*/
private String queryDir = "data/query";
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
index 3f3b818..60b5776 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairDeserializer;
-import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairDeserializer;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
@@ -42,10 +41,16 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
* <p>
* QueryResourceManager manages resource (file streams) used by each query job, and assign Ids to
* the jobs. During the life cycle of a query, the following methods must be called in strict order:
- * 1. assignJobId - get an Id for the new job. 2. beginQueryOfGivenQueryPaths - remind StorageEngine
- * that some files are being used 3. (if using filter)beginQueryOfGivenExpression - remind
- * StorageEngine that some files are being used 4. getQueryDataSource - open files for the job or
- * reuse existing readers. 5. endQueryForGivenJob - putBack the resource used by this job.
+ * 1. assignJobId - get an Id for the new job.
+ *
+ * 2. beginQueryOfGivenQueryPaths - remind StorageEngine that some files are being used
+ *
+ * 3. (if using filter)beginQueryOfGivenExpression - remind StorageEngine that some files are being
+ * used
+ *
+ * 4. getQueryDataSource - open files for the job or reuse existing readers.
+ *
+ * 5. endQueryForGivenJob - putBack the resource used by this job.
* </p>
*/
public class QueryResourceManager {
@@ -77,8 +82,7 @@ public class QueryResourceManager {
*
* <code>StorageEngine.getInstance().endQueryForGivenJob(device_1, 1)</code> and
* <code>StorageEngine.getInstance().endQueryForGivenJob(device_2, 2)</code> must be invoked no
- * matter how
- * query process Q1 exits normally or abnormally. So is Q2,
+ * matter how query process Q1 exits normally or abnormally. So is Q2,
* <code>StorageEngine.getInstance().endQueryForGivenJob(device_1, 3)</code> and
* <code>StorageEngine.getInstance().endQueryForGivenJob(device_2, 4)</code> must be invoked
*
@@ -91,7 +95,9 @@ public class QueryResourceManager {
private JobFileManager filePathsManager;
private AtomicLong maxJobId;
/**
+ * Record temporary files used for external sorting.
*
+ * Key: query job id. Value: temporary file list used for external sorting.
*/
private ConcurrentHashMap<Long, List<TimeValuePairDeserializer>> externalSortFileMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
index f9f8b11..c601ff4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobEngine.java
@@ -22,25 +22,34 @@ package org.apache.iotdb.db.query.externalsort;
import java.io.IOException;
import java.util.List;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
public interface ExternalSortJobEngine {
/**
- * Receive a list of TimeValuePairReaders and judge whether it should be processed using external
- * sort. If needed, do the merge sort for all TimeValuePairReaders using specific strategy.
+ * Receive a list of ChunkReaderWraps and judge whether it should be processed using external
+ * sort. If needed, do the merge sort for all ChunkReaderWraps using specific strategy.
*
- * @param timeValuePairReaderList A list include a set of TimeValuePairReaders
+ * @param chunkReaderWraps A list include a set of ChunkReaderWraps
*/
- List<IPointReader> executeWithGlobalTimeFilter(long queryId, List<IPointReader>
- timeValuePairReaderList, int startPriority) throws
- IOException;
+ List<IPointReader> executeForIPointReader(long queryId, List<ChunkReaderWrap>
+ chunkReaderWraps) throws IOException;
+
+
+ /**
+ * Receive a list of chunkReaderWraps and judge whether it should be processed using external
+ * sort. If needed, do the merge sort for all ChunkReaderWraps using specific strategy.
+ *
+ * @param chunkReaderWraps A list include a set of ChunkReaderWraps
+ */
+ List<IReaderByTimestamp> executeForByTimestampReader(long queryId, List<ChunkReaderWrap>
+ chunkReaderWraps) throws IOException;
/**
* Create an external sort job which contains many parts.
*/
- ExternalSortJob createJob(long queryId, List<IPointReader> timeValuePairReaderList,
- int startPriority)
- throws IOException;
+ ExternalSortJob createJob(long queryId, List<ChunkReaderWrap> timeValuePairReaderList) throws IOException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java
index a783aaa..713545f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/ExternalSortJobPart.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.query.externalsort;
import java.io.IOException;
-import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.IPointReader;
public abstract class ExternalSortJobPart {
@@ -31,7 +31,7 @@ public abstract class ExternalSortJobPart {
this.type = type;
}
- public abstract PriorityMergeReader executeWithGlobalTimeFilter() throws IOException;
+ public abstract IPointReader executeWithGlobalTimeFilter() throws IOException;
public ExternalSortJobPartType getType() {
return type;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java
index cf5d1d6..d9a0031 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/LineMerger.java
@@ -26,6 +26,7 @@
import org.apache.iotdb.db.query.externalsort.serialize.TimeValuePairSerializer;
import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairDeserializer;
import org.apache.iotdb.db.query.externalsort.serialize.impl.FixLengthTimeValuePairSerializer;
+ import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
@@ -39,10 +40,10 @@
this.queryId = queryId;
}
- public PriorityMergeReader merge(List<PriorityMergeReader> prioritySeriesReaders)
+ public IPointReader merge(List<IPointReader> prioritySeriesReaders)
throws IOException {
TimeValuePairSerializer serializer = new FixLengthTimeValuePairSerializer(tmpFilePath);
- PriorityMergeReader reader = new PriorityMergeReader(prioritySeriesReaders);
+ PriorityMergeReader reader = new PriorityMergeReader(prioritySeriesReaders, 1);
while (reader.hasNext()) {
serializer.write(reader.next());
}
@@ -50,6 +51,6 @@
serializer.close();
TimeValuePairDeserializer deserializer = new FixLengthTimeValuePairDeserializer(tmpFilePath);
QueryResourceManager.getInstance().registerTempExternalSortFile(queryId, deserializer);
- return new PriorityMergeReader(deserializer, prioritySeriesReaders.get(0).getPriority());
+ return deserializer;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
index 76bbfbf..6143109 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/MultiSourceExternalSortJobPart.java
@@ -22,7 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
- import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+ import org.apache.iotdb.db.query.reader.IPointReader;
public class MultiSourceExternalSortJobPart extends ExternalSortJobPart {
@@ -50,8 +50,8 @@
}
@Override
- public PriorityMergeReader executeWithGlobalTimeFilter() throws IOException {
- List<PriorityMergeReader> prioritySeriesReaders = new ArrayList<>();
+ public IPointReader executeWithGlobalTimeFilter() throws IOException {
+ List<IPointReader> prioritySeriesReaders = new ArrayList<>();
for (ExternalSortJobPart part : source) {
prioritySeriesReaders.add(part.executeWithGlobalTimeFilter());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
index ab94fc2..c8f1e43 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SimpleExternalSortEngine.java
@@ -26,8 +26,10 @@
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineFailureException;
+ import org.apache.iotdb.db.query.externalsort.adapter.ByTimestampReaderAdapter;
import org.apache.iotdb.db.query.reader.IPointReader;
- import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+ import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+ import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
public class SimpleExternalSortEngine implements ExternalSortJobEngine {
@@ -59,30 +61,38 @@
}
@Override
- public List<IPointReader> executeWithGlobalTimeFilter(long queryId, List<IPointReader> readers,
- int startPriority)
+ public List<IPointReader> executeForIPointReader(long queryId,
+ List<ChunkReaderWrap> chunkReaderWraps)
throws IOException {
- if (readers.size() < minExternalSortSourceCount) {
- return readers;
+ if (chunkReaderWraps.size() < minExternalSortSourceCount) {
+ return generateIPointReader(chunkReaderWraps, 0, chunkReaderWraps.size());
}
- ExternalSortJob job = createJob(queryId, readers, startPriority);
+ ExternalSortJob job = createJob(queryId, chunkReaderWraps);
return job.executeWithGlobalTimeFilter();
}
+ @Override
+ public List<IReaderByTimestamp> executeForByTimestampReader(long queryId,
+ List<ChunkReaderWrap> chunkReaderWraps) throws IOException {
+ if (chunkReaderWraps.size() < minExternalSortSourceCount) {
+ return generateIReaderByTimestamp(chunkReaderWraps, 0, chunkReaderWraps.size());
+ }
+ ExternalSortJob job = createJob(queryId, chunkReaderWraps);
+ return convert(job.executeWithGlobalTimeFilter());
+ }
+
//TODO: this method could be optimized to have a better performance
@Override
- public ExternalSortJob createJob(long queryId, List<IPointReader> readers, int startPriority)
- throws IOException {
+ public ExternalSortJob createJob(long queryId, List<ChunkReaderWrap> readerWrapList) {
long jodId = scheduler.genJobId();
List<ExternalSortJobPart> ret = new ArrayList<>();
- List<ExternalSortJobPart> tmpPartList = new ArrayList<>();
- for (IPointReader reader : readers) {
- ret.add(
- new SingleSourceExternalSortJobPart(new PriorityMergeReader(reader, startPriority++)));
+ for (ChunkReaderWrap readerWrap : readerWrapList) {
+ ret.add(new SingleSourceExternalSortJobPart(readerWrap));
}
int partId = 0;
while (ret.size() >= minExternalSortSourceCount) {
+ List<ExternalSortJobPart> tmpPartList = new ArrayList<>();
for (int i = 0; i < ret.size(); ) {
List<ExternalSortJobPart> partGroup = new ArrayList<>();
for (int j = 0; j < minExternalSortSourceCount && i < ret.size(); j++) {
@@ -97,11 +107,48 @@
partId++;
}
ret = tmpPartList;
- tmpPartList = new ArrayList<>();
}
return new ExternalSortJob(jodId, ret);
}
+ /**
+ * init IPointReader with ChunkReaderWrap.
+ */
+ private List<IPointReader> generateIPointReader(List<ChunkReaderWrap> readerWraps,
+ final int start, final int size) throws IOException {
+ List<IPointReader> pointReaderList = new ArrayList<>();
+ for (int i = start; i < start + size; i++) {
+ pointReaderList.add(readerWraps.get(i).getIPointReader());
+ }
+ return pointReaderList;
+ }
+
+ /**
+ * init IReaderByTimestamp with ChunkReaderWrap.
+ */
+ private List<IReaderByTimestamp> generateIReaderByTimestamp(List<ChunkReaderWrap> readerWraps,
+ final int start, final int size) throws IOException {
+ List<IReaderByTimestamp> readerByTimestampList = new ArrayList<>();
+ for (int i = start; i < start + size; i++) {
+ readerByTimestampList.add(readerWraps.get(i).getIReaderByTimestamp());
+ }
+ return readerByTimestampList;
+ }
+
+ /**
+ * convert IPointReader to implement interface of IReaderByTimestamp.
+ *
+ * @param pointReaderList reader list that implements IPointReader
+ * @return reader list that implements IReaderByTimestamp
+ */
+ private List<IReaderByTimestamp> convert(List<IPointReader> pointReaderList) {
+ List<IReaderByTimestamp> readerByTimestampList = new ArrayList<>();
+ for (IPointReader pointReader : pointReaderList) {
+ readerByTimestampList.add(new ByTimestampReaderAdapter(pointReader));
+ }
+ return readerByTimestampList;
+ }
+
private static class SimpleExternalSortJobEngineHelper {
private static SimpleExternalSortEngine INSTANCE = new SimpleExternalSortEngine();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java
index fe278b1..527eb39 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/SingleSourceExternalSortJobPart.java
@@ -19,20 +19,24 @@
*/
package org.apache.iotdb.db.query.externalsort;
- import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
+ import java.io.IOException;
+ import org.apache.iotdb.db.query.reader.IPointReader;
+ import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
public class SingleSourceExternalSortJobPart extends ExternalSortJobPart {
- private PriorityMergeReader timeValuePairReader;
+ private IPointReader timeValuePairReader;
+ private ChunkReaderWrap chunkReaderWrap;
- public SingleSourceExternalSortJobPart(PriorityMergeReader timeValuePairReader) {
- super(ExternalSortJobPartType.SINGLE_SOURCE);
- this.timeValuePairReader = timeValuePairReader;
- }
+ public SingleSourceExternalSortJobPart(ChunkReaderWrap chunkReaderWrap) {
+ super(ExternalSortJobPartType.SINGLE_SOURCE);
+ this.chunkReaderWrap = chunkReaderWrap;
+ }
- @Override
- public PriorityMergeReader executeWithGlobalTimeFilter() {
- return this.timeValuePairReader;
- }
+ @Override
+ public IPointReader executeWithGlobalTimeFilter() throws IOException {
+ timeValuePairReader = chunkReaderWrap.getIPointReader();
+ return this.timeValuePairReader;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
new file mode 100644
index 0000000..f572faf
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/adapter/ByTimestampReaderAdapter.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.query.externalsort.adapter;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.db.utils.TimeValuePair;
+
+/**
+ * This class is a adapter which makes IPointReader implement IReaderByTimestamp interface.
+ */
+public class ByTimestampReaderAdapter implements IReaderByTimestamp {
+
+ private IPointReader pointReader;
+ private boolean hasCached;
+ private TimeValuePair pair;
+
+ public ByTimestampReaderAdapter(IPointReader pointReader) {
+ this.pointReader = pointReader;
+ }
+
+ @Override
+ public Object getValueInTimestamp(long timestamp) throws IOException {
+ if (hasCached) {
+ if (pair.getTimestamp() < timestamp) {
+ hasCached = false;
+ } else if (pair.getTimestamp() == timestamp) {
+ hasCached = false;
+ return pair.getValue().getValue();
+ } else {
+ return null;
+ }
+ }
+
+ while (pointReader.hasNext()) {
+ pair = pointReader.next();
+ if (pair.getTimestamp() >= timestamp) {
+ hasCached = true;
+ break;
+ }
+ }
+
+ if (!hasCached) {
+ return null;
+ }
+
+ if (pair.getTimestamp() == timestamp) {
+ hasCached = false;
+ return pair.getValue().getValue();
+ } else {
+ return null;
+ }
+
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return pointReader.hasNext();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java
index ab11325..4e28371 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/TimeValuePairDeserializer.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.query.externalsort.serialize;
import java.io.IOException;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
import org.apache.iotdb.db.utils.TimeValuePair;
-public interface TimeValuePairDeserializer extends IPointReader {
+public interface TimeValuePairDeserializer extends IPointReader, IReaderByTimestamp {
@Override
default TimeValuePair current() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java
index 8abd7e1..ae09f75 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/FixLengthTimeValuePairDeserializer.java
@@ -49,6 +49,11 @@ public class FixLengthTimeValuePairDeserializer implements TimeValuePairDeserial
}
@Override
+ public Object getValueInTimestamp(long timestamp) throws IOException {
+ return null;
+ }
+
+ @Override
public boolean hasNext() throws IOException {
return inputStream.available() > 0;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java
index 33a1ff7..6f1cb3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/externalsort/serialize/impl/SimpleTimeValuePairDeserializer.java
@@ -47,6 +47,11 @@ public class SimpleTimeValuePairDeserializer implements TimeValuePairDeserialize
}
@Override
+ public Object getValueInTimestamp(long timestamp) throws IOException {
+ return null;
+ }
+
+ @Override
public boolean hasNext() throws IOException {
return inputStream.available() > 0;
}
@@ -67,8 +72,11 @@ public class SimpleTimeValuePairDeserializer implements TimeValuePairDeserialize
*/
@Override
public void close() throws IOException {
- objectInputStream.close();
File file = new File(tmpFilePath);
+ if (!file.exists()) {
+ return;
+ }
+ objectInputStream.close();
if (!file.delete()) {
throw new IOException("Delete external sort tmp file error. FilePath:" + tmpFilePath);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
new file mode 100644
index 0000000..b98617c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/ChunkReaderWrap.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.reader.chunkRelated;
+
+import java.io.IOException;
+import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+
+public class ChunkReaderWrap {
+
+ private ChunkReaderType type;
+ private Filter filter;
+
+ // attributes for disk chunk
+ private ChunkMetaData chunkMetaData;
+ private ChunkLoader chunkLoader;
+
+ // attributes for mem chunk
+ private ReadOnlyMemChunk readOnlyMemChunk;
+
+ /**
+ * This is used in test.
+ */
+ protected ChunkReaderWrap() {
+
+ }
+
+ /**
+ * constructor of diskChunkReader
+ */
+ public ChunkReaderWrap(ChunkMetaData metaData, ChunkLoader chunkLoader, Filter filter) {
+ this.type = ChunkReaderType.DISK_CHUNK;
+ this.chunkMetaData = metaData;
+ this.chunkLoader = chunkLoader;
+ this.filter = filter;
+ }
+
+ /**
+ * constructor of MemChunkReader
+ */
+ public ChunkReaderWrap(ReadOnlyMemChunk readOnlyMemChunk, Filter filter) {
+ type = ChunkReaderType.MEM_CHUNK;
+ this.readOnlyMemChunk = readOnlyMemChunk;
+ this.filter = filter;
+ }
+
+ public IPointReader getIPointReader() throws IOException {
+ if (type.equals(ChunkReaderType.DISK_CHUNK)) {
+ Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
+ : new ChunkReaderWithoutFilter(chunk);
+
+ return new DiskChunkReader(chunkReader);
+ } else {
+ return new MemChunkReader(readOnlyMemChunk, filter);
+ }
+ }
+
+ public IReaderByTimestamp getIReaderByTimestamp() throws IOException {
+ if (type.equals(ChunkReaderType.DISK_CHUNK)) {
+ Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ ChunkReaderByTimestamp chunkReader = new ChunkReaderByTimestamp(chunk);
+ return new DiskChunkReaderByTimestamp(chunkReader);
+ } else {
+ return new MemChunkReaderByTimestamp(readOnlyMemChunk);
+ }
+ }
+
+ enum ChunkReaderType {
+ DISK_CHUNK, MEM_CHUNK
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
index 939bb88..9f72be3 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
@@ -29,21 +29,16 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.externalsort.ExternalSortJobEngine;
import org.apache.iotdb.db.query.externalsort.SimpleExternalSortEngine;
import org.apache.iotdb.db.query.reader.IPointReader;
-import org.apache.iotdb.db.query.reader.chunkRelated.DiskChunkReader;
-import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReader;
+import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
/**
* To read a list of unsequence TsFiles, this class extends {@link PriorityMergeReader} to
@@ -65,8 +60,7 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
this.seriesPath = seriesPath;
this.queryId = context.getJobId();
- int priorityValue = 1;
- List<IPointReader> priorityReaderList = new ArrayList<>();
+ List<ChunkReaderWrap> readerWrapList = new ArrayList<>();
for (TsFileResource tsFileResource : unseqResources) {
// prepare metaDataList
@@ -93,7 +87,6 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
ChunkLoaderImpl chunkLoader = null;
if (!metaDataList.isEmpty()) {
- // create and add ChunkReader with priority
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource.getFile().getPath(), tsFileResource.isClosed());
chunkLoader = new ChunkLoaderImpl(tsFileReader);
@@ -111,37 +104,23 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
continue;
}
}
-
- Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- ChunkReader chunkReader = filter != null ? new ChunkReaderWithFilter(chunk, filter)
- : new ChunkReaderWithoutFilter(chunk);
-
- priorityReaderList.add(new DiskChunkReader(chunkReader));
+ // create and add DiskChunkReader
+ readerWrapList.add(new ChunkReaderWrap(chunkMetaData, chunkLoader, filter));
}
if (!tsFileResource.isClosed()) {
- // create and add MemChunkReader with priority
- priorityReaderList.add(new MemChunkReader(tsFileResource.getReadOnlyMemChunk(), filter));
+ // create and add MemChunkReader
+ readerWrapList.add(new ChunkReaderWrap(tsFileResource.getReadOnlyMemChunk(), filter));
}
}
- if (shouldUseExternalSort()) {
- ExternalSortJobEngine externalSortJobEngine = SimpleExternalSortEngine.getInstance();
- List<IPointReader> readerList = externalSortJobEngine
- .executeWithGlobalTimeFilter(queryId, priorityReaderList, priorityValue);
- for (IPointReader chunkReader : readerList) {
- addReaderWithPriority(chunkReader, priorityValue++);
- }
- } else {
- for (IPointReader chunkReader : priorityReaderList) {
- addReaderWithPriority(chunkReader, priorityValue++);
- }
+ ExternalSortJobEngine externalSortJobEngine = SimpleExternalSortEngine.getInstance();
+ List<IPointReader> readerList = externalSortJobEngine
+ .executeForIPointReader(queryId, readerWrapList);
+ int priorityValue = 1;
+ for (IPointReader chunkReader : readerList) {
+ addReaderWithPriority(chunkReader, priorityValue++);
}
-
- }
-
- private boolean shouldUseExternalSort() {
- return false;
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
index 6a6123d..b590816 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
@@ -19,22 +19,23 @@
package org.apache.iotdb.db.query.reader.resourceRelated;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.query.reader.chunkRelated.DiskChunkReaderByTimestamp;
-import org.apache.iotdb.db.query.reader.chunkRelated.MemChunkReaderByTimestamp;
+import org.apache.iotdb.db.query.externalsort.ExternalSortJobEngine;
+import org.apache.iotdb.db.query.externalsort.SimpleExternalSortEngine;
+import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReaderByTimestamp;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
/**
* To read a list of unsequence TsFiles by timestamp, this class extends {@link
@@ -47,10 +48,12 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
*/
public class UnseqResourceReaderByTimestamp extends PriorityMergeReaderByTimestamp {
+ private long queryId;
+
public UnseqResourceReaderByTimestamp(Path seriesPath,
List<TsFileResource> unseqResources, QueryContext context) throws IOException {
- int priorityValue = 1;
-
+ this.queryId = context.getJobId();
+ List<ChunkReaderWrap> chunkReaderWrapList = new ArrayList<>();
for (TsFileResource tsFileResource : unseqResources) {
// prepare metaDataList
@@ -75,26 +78,24 @@ public class UnseqResourceReaderByTimestamp extends PriorityMergeReaderByTimesta
chunkLoader = new ChunkLoaderImpl(tsFileReader);
}
for (ChunkMetaData chunkMetaData : metaDataList) {
-
- Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- ChunkReaderByTimestamp chunkReader = new ChunkReaderByTimestamp(chunk);
-
- addReaderWithPriority(new DiskChunkReaderByTimestamp(chunkReader),
- priorityValue);
-
- priorityValue++;
+ chunkReaderWrapList.add(new ChunkReaderWrap(chunkMetaData, chunkLoader, null));
}
if (!tsFileResource.isClosed()) {
- // create and add MemChunkReader with priority
- addReaderWithPriority(
- new MemChunkReaderByTimestamp(tsFileResource.getReadOnlyMemChunk()), priorityValue++);
+ // create and add MemChunkReader
+ chunkReaderWrapList.add(new ChunkReaderWrap(tsFileResource.getReadOnlyMemChunk(), null));
}
}
- // TODO add external sort when needed
-
// TODO future work: create reader when getValueInTimestamp so that resources
// whose start and end time do not satisfy can be skipped.
+
+ ExternalSortJobEngine externalSortJobEngine = SimpleExternalSortEngine.getInstance();
+ List<IReaderByTimestamp> readerList = externalSortJobEngine
+ .executeForByTimestampReader(queryId, chunkReaderWrapList);
+ int priorityValue = 1;
+ for (IReaderByTimestamp chunkReader : readerList) {
+ addReaderWithPriority(chunkReader, priorityValue++);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 428263e..c15893b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -37,13 +37,9 @@ public class PriorityMergeReader implements IPointReader {
public PriorityMergeReader() {
}
- public PriorityMergeReader(IPointReader reader, int priority) throws IOException {
- addReaderWithPriority(reader, priority);
- }
-
- public PriorityMergeReader(List<PriorityMergeReader> prioritySeriesReaders) throws IOException {
- for (PriorityMergeReader reader : prioritySeriesReaders) {
- addReaderWithPriority(reader, reader.getPriority());
+ public PriorityMergeReader(List<IPointReader> prioritySeriesReaders, int startPriority) throws IOException {
+ for (IPointReader reader : prioritySeriesReaders) {
+ addReaderWithPriority(reader, startPriority++);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java b/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java
index 33b9d22..0342ba0 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/externalsort/ExternalSortEngineTest.java
@@ -27,6 +27,7 @@ import java.util.Random;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
import org.apache.iotdb.db.query.reader.universal.FakedSeriesReader;
import org.apache.iotdb.db.query.reader.universal.PriorityMergeReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -51,15 +52,11 @@ public class ExternalSortEngineTest {
SimpleExternalSortEngine engine = new SimpleExternalSortEngine(baseDir + "/", 2);
List<IPointReader> readerList1 = genSimple();
List<IPointReader> readerList2 = genSimple();
- readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
- PriorityMergeReader reader1 = new PriorityMergeReader();
- for (int i = 0; i < readerList1.size(); i++) {
- reader1.addReaderWithPriority(readerList1.get(i), i);
- }
- PriorityMergeReader reader2 = new PriorityMergeReader();
- for (int i = 0; i < readerList2.size(); i++) {
- reader2.addReaderWithPriority(readerList2.get(i), i);
- }
+ List<ChunkReaderWrap> chunkReaderWrapList = new ArrayList<>();
+ readerList1.forEach(x -> chunkReaderWrapList.add(new FakeChunkReaderWrap(x)));
+ readerList1 = engine.executeForIPointReader(queryId, chunkReaderWrapList);
+ PriorityMergeReader reader1 = new PriorityMergeReader(readerList1, 1);
+ PriorityMergeReader reader2 = new PriorityMergeReader(readerList2, 1);
check(reader1, reader2);
reader1.close();
reader2.close();
@@ -74,15 +71,11 @@ public class ExternalSortEngineTest {
List<IPointReader> readerList1 = genReaders(data);
List<IPointReader> readerList2 = genReaders(data);
- readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
- PriorityMergeReader reader1 = new PriorityMergeReader();
- for (int i = 0; i < readerList1.size(); i++) {
- reader1.addReaderWithPriority(readerList1.get(i), i);
- }
- PriorityMergeReader reader2 = new PriorityMergeReader();
- for (int i = 0; i < readerList2.size(); i++) {
- reader2.addReaderWithPriority(readerList2.get(i), i);
- }
+ List<ChunkReaderWrap> chunkReaderWrapList = new ArrayList<>();
+ readerList1.forEach(x -> chunkReaderWrapList.add(new FakeChunkReaderWrap(x)));
+ readerList1 = engine.executeForIPointReader(queryId, chunkReaderWrapList);
+ PriorityMergeReader reader1 = new PriorityMergeReader(readerList1, 1);
+ PriorityMergeReader reader2 = new PriorityMergeReader(readerList2, 1);
check(reader1, reader2);
reader1.close();
@@ -96,8 +89,11 @@ public class ExternalSortEngineTest {
List<long[]> data = genData(lineCount, valueCount);
List<IPointReader> readerList1 = genReaders(data);
+ List<ChunkReaderWrap> chunkReaderWrapList = new ArrayList<>();
+ readerList1.forEach(x -> chunkReaderWrapList.add(new FakeChunkReaderWrap(x)));
+
long startTimestamp = System.currentTimeMillis();
- readerList1 = engine.executeWithGlobalTimeFilter(queryId, readerList1, 1);
+ readerList1 = engine.executeForIPointReader(queryId, chunkReaderWrapList);
PriorityMergeReader reader1 = new PriorityMergeReader();
for (int i = 0; i < readerList1.size(); i++) {
reader1.addReaderWithPriority(readerList1.get(i), i);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java b/server/src/test/java/org/apache/iotdb/db/query/externalsort/FakeChunkReaderWrap.java
similarity index 62%
rename from server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java
rename to server/src/test/java/org/apache/iotdb/db/query/externalsort/FakeChunkReaderWrap.java
index 226d5ea..751e9a9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityReaderBean.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/externalsort/FakeChunkReaderWrap.java
@@ -16,26 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.iotdb.db.query.reader.universal;
+package org.apache.iotdb.db.query.externalsort;
import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.db.query.reader.IReaderByTimestamp;
+import org.apache.iotdb.db.query.reader.chunkRelated.ChunkReaderWrap;
-public class PriorityReaderBean {
-
- private IPointReader reader;
- private int priority;
-
- public PriorityReaderBean(IPointReader reader, int priority) {
- this.reader = reader;
- this.priority = priority;
+public class FakeChunkReaderWrap extends ChunkReaderWrap {
+ private IPointReader pointReader;
+ public FakeChunkReaderWrap(IPointReader pointReader){
+ super();
+ this.pointReader = pointReader;
}
- public IPointReader getReader() {
- return reader;
+ @Override
+ public IPointReader getIPointReader() {
+ return pointReader;
}
- public int getPriority() {
- return priority;
+ @Override
+ public IReaderByTimestamp getIReaderByTimestamp() {
+ return null;
}
+
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
index f2cc207..480aedc 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
@@ -20,13 +20,7 @@
package org.apache.iotdb.db.query.reader.universal;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.utils.TsPrimitiveType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.Assert;
import org.junit.Test;