You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/23 08:59:39 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated (91bef5f -> addc991)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 91bef5f  fix file name comparing method
     new 1b55d6b  use modification file to filter chunk metadata when query unsealed file
     new addc991  add exception handle

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../db/engine/filenodeV2/FileNodeProcessorV2.java     | 17 ++++++++++++-----
 .../engine/filenodeV2/UnsealedTsFileProcessorV2.java  | 19 +++++++++++++++----
 ...ion.java => UnsealedTsFileProcessorException.java} | 14 +++++++-------
 .../db/engine/filenodeV2/FileNodeProcessorV2Test.java | 10 ++++++++--
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java     | 10 +++++++---
 .../db/query/reader/sequence/SeqDataReaderTest.java   |  5 +++--
 .../query/reader/sequence/UnsealedSeqReaderTest.java  |  5 +++--
 7 files changed, 55 insertions(+), 25 deletions(-)
 copy iotdb/src/main/java/org/apache/iotdb/db/exception/{FileNodeProcessorException.java => UnsealedTsFileProcessorException.java} (68%)


[incubator-iotdb] 02/02: add exception handle

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit addc9916ccfa8331178d5482bda839fd1edd52c9
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Jun 23 16:59:37 2019 +0800

    add exception handle
---
 .../iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java | 17 ++++++++++++-----
 .../db/engine/filenodeV2/FileNodeProcessorV2Test.java   | 10 ++++++++--
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java       | 10 +++++++---
 .../db/query/reader/sequence/SeqDataReaderTest.java     |  5 +++--
 .../db/query/reader/sequence/UnsealedSeqReaderTest.java |  5 +++--
 5 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index d700f85..3e2baed 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -311,7 +312,8 @@ public class FileNodeProcessorV2 {
 
 
   // TODO need a read lock, please consider the concurrency with flush manager threads.
-  public QueryDataSourceV2 query(String deviceId, String measurementId) {
+  public QueryDataSourceV2 query(String deviceId, String measurementId)
+      throws FileNodeProcessorException {
     lock.readLock().lock();
     try {
       List<TsFileResourceV2> seqResources = getFileReSourceListForQuery(sequenceFileList,
@@ -329,7 +331,7 @@ public class FileNodeProcessorV2 {
    * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk
    */
   private List<TsFileResourceV2> getFileReSourceListForQuery(List<TsFileResourceV2> tsFileResources,
-      String deviceId, String measurementId) {
+      String deviceId, String measurementId) throws FileNodeProcessorException {
 
     MeasurementSchema mSchema = fileSchema.getMeasurementSchema(measurementId);
     TSDataType dataType = mSchema.getType();
@@ -344,9 +346,14 @@ public class FileNodeProcessorV2 {
           if (tsFileResource.isClosed()) {
             tsfileResourcesForQuery.add(tsFileResource);
           } else {
-            Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = tsFileResource
-                .getUnsealedFileProcessor()
-                .query(deviceId, measurementId, dataType, mSchema.getProps());
+            Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = null;
+            try {
+              pair = tsFileResource
+                  .getUnsealedFileProcessor()
+                  .query(deviceId, measurementId, dataType, mSchema.getProps());
+            } catch (UnsealedTsFileProcessorException e) {
+              throw new FileNodeProcessorException(e);
+            }
             tsfileResourcesForQuery
                 .add(new TsFileResourceV2(tsFileResource.getFile(), pair.left, pair.right));
           }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 8c7b761..4facad4 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.filenodeV2;
 
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
+import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -62,7 +63,12 @@ public class FileNodeProcessorV2Test {
     }
 
     processor.syncCloseFileNode();
-    QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
+    QueryDataSourceV2 queryDataSource = null;
+    try {
+      queryDataSource = processor.query(deviceId, measurementId);
+    } catch (FileNodeProcessorException e) {
+      e.printStackTrace();
+    }
     Assert.assertEquals(queryDataSource.getSeqResources().size(), 100);
     for (TsFileResourceV2 resource : queryDataSource.getSeqResources()) {
       Assert.assertTrue(resource.isClosed());
@@ -71,7 +77,7 @@ public class FileNodeProcessorV2Test {
 
 
   @Test
-  public void testSeqAndUnSeqSyncClose() {
+  public void testSeqAndUnSeqSyncClose() throws FileNodeProcessorException {
 
     for (int j = 21; j <= 30; j++) {
       TSRecord record = new TSRecord(j, deviceId);
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 9dfb26a..71233a9 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SysTimeVersionController;
+import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.FileSchemaUtils;
@@ -68,7 +69,8 @@ public class UnsealedTsFileProcessorV2Test {
   }
 
   @Test
-  public void testWriteAndFlush() throws WriteProcessException, IOException {
+  public void testWriteAndFlush()
+      throws WriteProcessException, IOException, UnsealedTsFileProcessorException {
     processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
         FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{}, ()-> true);
 
@@ -112,7 +114,8 @@ public class UnsealedTsFileProcessorV2Test {
 
 
   @Test
-  public void testMultiFlush() throws WriteProcessException, IOException {
+  public void testMultiFlush()
+      throws WriteProcessException, IOException, UnsealedTsFileProcessorException {
     processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
         FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{}, ()->true);
 
@@ -144,7 +147,8 @@ public class UnsealedTsFileProcessorV2Test {
 
 
   @Test
-  public void testWriteAndClose() throws WriteProcessException, IOException {
+  public void testWriteAndClose()
+      throws WriteProcessException, IOException, UnsealedTsFileProcessorException {
     processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
         FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE,
         unsealedTsFileProcessorV2 -> {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SeqDataReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SeqDataReaderTest.java
index b458d74..831e192 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SeqDataReaderTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/SeqDataReaderTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.reader.sequence;
 
 import java.io.IOException;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
+import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.query.reader.ReaderTestHelper;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -30,7 +31,7 @@ import org.junit.Test;
 public class SeqDataReaderTest extends ReaderTestHelper {
 
   @Test
-  public void testSeqReader() throws IOException {
+  public void testSeqReader() throws IOException, FileNodeProcessorException {
     QueryDataSourceV2 queryDataSource = fileNodeProcessorV2.query(deviceId, measurementId);
     Path path = new Path(deviceId, measurementId);
     SequenceDataReaderV2 readerV2 = new SequenceDataReaderV2(path,
@@ -49,7 +50,7 @@ public class SeqDataReaderTest extends ReaderTestHelper {
   }
 
   @Test
-  public void testSeqByTimestampReader() throws IOException {
+  public void testSeqByTimestampReader() throws IOException, FileNodeProcessorException {
     QueryDataSourceV2 queryDataSource = fileNodeProcessorV2.query(deviceId, measurementId);
     Path path = new Path(deviceId, measurementId);
     SequenceDataReaderByTimestampV2 readerV2 = new SequenceDataReaderByTimestampV2(path,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/UnsealedSeqReaderTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/UnsealedSeqReaderTest.java
index 9f0c9fa..8c13ec0 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/UnsealedSeqReaderTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/sequence/UnsealedSeqReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.reader.sequence;
 import java.io.IOException;
 import org.apache.iotdb.db.engine.filenodeV2.TsFileResourceV2;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
+import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.query.reader.ReaderTestHelper;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.junit.Assert;
@@ -29,7 +30,7 @@ import org.junit.Test;
 public class UnsealedSeqReaderTest extends ReaderTestHelper {
 
   @Test
-  public void testUnSealedReader() throws IOException {
+  public void testUnSealedReader() throws IOException, FileNodeProcessorException {
     QueryDataSourceV2 queryDataSource = fileNodeProcessorV2.query(deviceId, measurementId);
     TsFileResourceV2 resourceV2 = queryDataSource.getSeqResources().get(0);
     Assert.assertEquals(false, resourceV2.isClosed());
@@ -47,7 +48,7 @@ public class UnsealedSeqReaderTest extends ReaderTestHelper {
   }
 
   @Test
-  public void testUnSealedByTimestampReader() throws IOException {
+  public void testUnSealedByTimestampReader() throws IOException, FileNodeProcessorException {
     QueryDataSourceV2 queryDataSource = fileNodeProcessorV2.query(deviceId, measurementId);
     TsFileResourceV2 resourceV2 = queryDataSource.getSeqResources().get(0);
     Assert.assertEquals(false, resourceV2.isClosed());


[incubator-iotdb] 01/02: use modification file to filter chunk metadata when query unsealed file

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 1b55d6baafd1a7105fd0528c382008835e2533fb
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Jun 23 16:54:58 2019 +0800

    use modification file to filter chunk metadata when query unsealed file
---
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 19 ++++++++--
 .../UnsealedTsFileProcessorException.java          | 44 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 4 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 6b594e1..53bd1a5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -34,11 +35,15 @@ import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTaskV2;
 import org.apache.iotdb.db.engine.memtable.MemTablePool;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -366,7 +371,8 @@ public class UnsealedTsFileProcessorV2 {
    * @return corresponding chunk data and chunk metadata in memory
    */
   public Pair<ReadOnlyMemChunk, List<ChunkMetaData>> query(String deviceId,
-      String measurementId, TSDataType dataType, Map<String, String> props) {
+      String measurementId, TSDataType dataType, Map<String, String> props)
+      throws UnsealedTsFileProcessorException {
     flushQueryLock.readLock().lock();
     try {
       MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
@@ -385,10 +391,15 @@ public class UnsealedTsFileProcessorV2 {
       // so we do not need to handle it again in the following readOnlyMemChunk
       ReadOnlyMemChunk timeValuePairSorter = new ReadOnlyMemChunk(dataType, memSeriesLazyMerger,
           Collections.emptyMap());
-      return new Pair<>(timeValuePairSorter,
-          writer.getVisibleMetadatas(deviceId, measurementId, dataType));
-      //RL: TODO 后面查询处理时认为这里返回的List<ChunkMetaData>是已经被mod处理过的?
 
+      ModificationFile modificationFile = tsFileResource.getModFile();
+
+      List<ChunkMetaData> chunkMetaDataList = writer.getVisibleMetadatas(deviceId, measurementId, dataType);
+      QueryUtils.modifyChunkMetaData(chunkMetaDataList, (List<Modification>) modificationFile.getModifications());
+
+      return new Pair<>(timeValuePairSorter, chunkMetaDataList);
+    } catch (IOException e) {
+      throw new UnsealedTsFileProcessorException("read modification file failed", e);
     } finally {
       flushQueryLock.readLock().unlock();
     }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/UnsealedTsFileProcessorException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/UnsealedTsFileProcessorException.java
new file mode 100644
index 0000000..72374e9
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/UnsealedTsFileProcessorException.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception;
+
+public class UnsealedTsFileProcessorException extends ProcessorException {
+
+  private static final long serialVersionUID = 3749107630243950925L;
+
+  public UnsealedTsFileProcessorException() {
+    super();
+  }
+
+  public UnsealedTsFileProcessorException(Exception pathExcp) {
+    super(pathExcp.getMessage());
+  }
+
+  public UnsealedTsFileProcessorException(String msg) {
+    super(msg);
+  }
+
+  public UnsealedTsFileProcessorException(Throwable throwable) {
+    super(throwable);
+  }
+
+  public UnsealedTsFileProcessorException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}