You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/16 14:00:17 UTC
[iotdb] branch master updated: [IOTDB-3160] TsFile will be corrupted when flushing memtable appears OOM (#5892)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b1734a082a [IOTDB-3160] TsFile will be corrupted when flushing memtable appears OOM (#5892)
b1734a082a is described below
commit b1734a082a5dcedf07c27ba75d036b42be775a28
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon May 16 22:00:12 2022 +0800
[IOTDB-3160] TsFile will be corrupted when flushing memtable appears OOM (#5892)
---
.../iotdb/db/integration/IoTDBRestartIT.java | 101 ++++++++++++++-------
.../db/engine/storagegroup/TsFileProcessor.java | 54 ++++++++---
.../write/writer/RestorableTsFileIOWriter.java | 37 +-------
3 files changed, 108 insertions(+), 84 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index 2b940cfe00..62e59bd64b 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -18,15 +18,20 @@
*/
package org.apache.iotdb.db.integration;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
@@ -48,12 +53,19 @@ public class IoTDBRestartIT {
private final Logger logger = LoggerFactory.getLogger(IoTDBRestartIT.class);
- @Test
- public void testRestart()
- throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
+ @Before
+ public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ }
+ @Test
+ public void testRestart() throws SQLException, IOException, StorageEngineException {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -99,16 +111,10 @@ public class IoTDBRestartIT {
}
}
}
-
- EnvironmentUtils.cleanEnv();
}
@Test
- public void testRestartDelete()
- throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
- EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
-
+ public void testRestartDelete() throws SQLException, IOException, StorageEngineException {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -169,16 +175,11 @@ public class IoTDBRestartIT {
resultSet.close();
}
}
-
- EnvironmentUtils.cleanEnv();
}
@Test
public void testRestartQueryLargerThanEndTime()
throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
- EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
-
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -228,16 +229,11 @@ public class IoTDBRestartIT {
}
assertEquals(1, cnt);
}
-
- EnvironmentUtils.cleanEnv();
}
@Test
public void testRestartEndTime()
throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
- EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
-
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -284,15 +280,10 @@ public class IoTDBRestartIT {
}
assertEquals(2, cnt);
}
-
- EnvironmentUtils.cleanEnv();
}
@Test
public void testRecoverWALMismatchDataType() throws Exception {
- EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
-
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -324,15 +315,10 @@ public class IoTDBRestartIT {
assertEquals(1, cnt);
}
}
-
- EnvironmentUtils.cleanEnv();
}
@Test
public void testRecoverWALDeleteSchema() throws Exception {
- EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
-
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -361,14 +347,10 @@ public class IoTDBRestartIT {
assertEquals(1, cnt);
}
}
-
- EnvironmentUtils.cleanEnv();
}
@Test
public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
- EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
int avgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
config.setAvgSeriesPointNumberThreshold(2);
@@ -411,6 +393,55 @@ public class IoTDBRestartIT {
config.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
config.setSeqTsFileSize(tsFileSize);
config.setUnSeqTsFileSize(unFsFileSize);
- EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testRecoverFromFlushMemTableError() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("insert into root.turbine1.d1(timestamp,s1,s2) values(1,1.1,2.2)");
+ }
+
+ // mock exception
+ TsFileProcessor[] tsFileProcessors =
+ StorageEngine.getInstance()
+ .getProcessorByDataRegionId(new PartialPath("root.turbine1"), 0)
+ .getWorkSequenceTsFileProcessors()
+ .toArray(new TsFileProcessor[0]);
+ Assert.assertEquals(1, tsFileProcessors.length);
+ IMemTable memTable = tsFileProcessors[0].getWorkMemTable();
+ memTable.clear();
+ memTable.addTextDataSize(1);
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("flush");
+ }
+
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(false);
+ EnvironmentUtils.restartDaemon();
+
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet = statement.execute("select * from root.**");
+ assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ Assert.assertEquals("1", resultSet.getString(1));
+ Assert.assertEquals("1.1", resultSet.getString(2));
+ Assert.assertEquals("2.2", resultSet.getString(3));
+ cnt++;
+ }
+ Assert.assertEquals(1, cnt);
+ }
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 01fa311687..8cac805ba2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -1191,6 +1191,22 @@ public class TsFileProcessor {
}
}
+ /** This method will synchronize the memTable and release its flushing resources */
+ private void syncReleaseFlushedMemTable(IMemTable memTable) {
+ synchronized (memTable) {
+ releaseFlushedMemTable(memTable);
+ memTable.notifyAll();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: {} released a memtable (signal={}), flushingMemtables size ={}",
+ storageGroupName,
+ tsFileResource.getTsFile().getName(),
+ memTable.isSignalMemTable(),
+ flushingMemTables.size());
+ }
+ }
+ }
+
/**
* Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
* the flush manager pool
@@ -1236,7 +1252,29 @@ public class TsFileProcessor {
tsFileResource.getTsFile().getName(),
e1);
}
- Thread.currentThread().interrupt();
+ // release resource
+ try {
+ syncReleaseFlushedMemTable(memTableToFlush);
+ // make sure no query will search this file
+ tsFileResource.setTimeIndex(config.getTimeIndexLevel().getTimeIndex());
+ // this callback method will register this empty tsfile into TsFileManager
+ for (CloseFileListener closeFileListener : closeFileListeners) {
+ closeFileListener.onClosed(this);
+ }
+ // close writer
+ writer.close();
+ writer = null;
+ synchronized (flushingMemTables) {
+ flushingMemTables.notifyAll();
+ }
+ } catch (Exception e1) {
+ logger.error(
+ "{}: {} Release resource meets error",
+ storageGroupName,
+ tsFileResource.getTsFile().getName(),
+ e1);
+ }
+ return;
}
}
}
@@ -1278,19 +1316,9 @@ public class TsFileProcessor {
tsFileResource.getTsFile().getName(),
memTableToFlush.isSignalMemTable());
}
+
// for sync flush
- synchronized (memTableToFlush) {
- releaseFlushedMemTable(memTableToFlush);
- memTableToFlush.notifyAll();
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: {} released a memtable (signal={}), flushingMemtables size ={}",
- storageGroupName,
- tsFileResource.getTsFile().getName(),
- memTableToFlush.isSignalMemTable(),
- flushingMemTables.size());
- }
- }
+ syncReleaseFlushedMemTable(memTableToFlush);
if (shouldClose && flushingMemTables.isEmpty() && writer != null) {
try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 7cabb92612..78253124b8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -75,42 +75,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
* @throws IOException if write failed, or the file is broken but autoRepair==false.
*/
public RestorableTsFileIOWriter(File file) throws IOException {
- if (logger.isDebugEnabled()) {
- logger.debug("{} is opened.", file.getName());
- }
- this.file = file;
- this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
-
- // file doesn't exist
- if (file.length() == 0) {
- startFile();
- crashed = true;
- canWrite = true;
- return;
- }
-
- if (file.exists()) {
- try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
-
- truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
- minPlanIndex = reader.getMinPlanIndex();
- maxPlanIndex = reader.getMaxPlanIndex();
- if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
- crashed = false;
- canWrite = false;
- out.close();
- } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
- out.close();
- throw new NotCompatibleTsFileException(
- String.format("%s is not in TsFile format.", file.getAbsolutePath()));
- } else {
- crashed = true;
- canWrite = true;
- // remove broken data
- out.truncate(truncatedSize);
- }
- }
- }
+ this(file, true);
}
public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {