You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/16 14:00:17 UTC

[iotdb] branch master updated: [IOTDB-3160] TsFile will be corrupted when flushing memtable appears OOM (#5892)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b1734a082a [IOTDB-3160] TsFile will be corrupted when flushing memtable appears OOM (#5892)
b1734a082a is described below

commit b1734a082a5dcedf07c27ba75d036b42be775a28
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon May 16 22:00:12 2022 +0800

    [IOTDB-3160] TsFile will be corrupted when flushing memtable appears OOM (#5892)
---
 .../iotdb/db/integration/IoTDBRestartIT.java       | 101 ++++++++++++++-------
 .../db/engine/storagegroup/TsFileProcessor.java    |  54 ++++++++---
 .../write/writer/RestorableTsFileIOWriter.java     |  37 +-------
 3 files changed, 108 insertions(+), 84 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index 2b940cfe00..62e59bd64b 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -18,15 +18,20 @@
  */
 package org.apache.iotdb.db.integration;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 import org.apache.iotdb.jdbc.Config;
 
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -48,12 +53,19 @@ public class IoTDBRestartIT {
 
   private final Logger logger = LoggerFactory.getLogger(IoTDBRestartIT.class);
 
-  @Test
-  public void testRestart()
-      throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
+  @Before
+  public void setUp() throws Exception {
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
 
+  @Test
+  public void testRestart() throws SQLException, IOException, StorageEngineException {
     try (Connection connection =
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -99,16 +111,10 @@ public class IoTDBRestartIT {
         }
       }
     }
-
-    EnvironmentUtils.cleanEnv();
   }
 
   @Test
-  public void testRestartDelete()
-      throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
-    EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-
+  public void testRestartDelete() throws SQLException, IOException, StorageEngineException {
     try (Connection connection =
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -169,16 +175,11 @@ public class IoTDBRestartIT {
         resultSet.close();
       }
     }
-
-    EnvironmentUtils.cleanEnv();
   }
 
   @Test
   public void testRestartQueryLargerThanEndTime()
       throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
-    EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-
     try (Connection connection =
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -228,16 +229,11 @@ public class IoTDBRestartIT {
       }
       assertEquals(1, cnt);
     }
-
-    EnvironmentUtils.cleanEnv();
   }
 
   @Test
   public void testRestartEndTime()
       throws SQLException, ClassNotFoundException, IOException, StorageEngineException {
-    EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-
     try (Connection connection =
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -284,15 +280,10 @@ public class IoTDBRestartIT {
       }
       assertEquals(2, cnt);
     }
-
-    EnvironmentUtils.cleanEnv();
   }
 
   @Test
   public void testRecoverWALMismatchDataType() throws Exception {
-    EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-
     try (Connection connection =
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -324,15 +315,10 @@ public class IoTDBRestartIT {
         assertEquals(1, cnt);
       }
     }
-
-    EnvironmentUtils.cleanEnv();
   }
 
   @Test
   public void testRecoverWALDeleteSchema() throws Exception {
-    EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-
     try (Connection connection =
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
@@ -361,14 +347,10 @@ public class IoTDBRestartIT {
         assertEquals(1, cnt);
       }
     }
-
-    EnvironmentUtils.cleanEnv();
   }
 
   @Test
   public void testRecoverWALDeleteSchemaCheckResourceTime() throws Exception {
-    EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
     int avgSeriesPointNumberThreshold = config.getAvgSeriesPointNumberThreshold();
     config.setAvgSeriesPointNumberThreshold(2);
@@ -411,6 +393,55 @@ public class IoTDBRestartIT {
     config.setAvgSeriesPointNumberThreshold(avgSeriesPointNumberThreshold);
     config.setSeqTsFileSize(tsFileSize);
     config.setUnSeqTsFileSize(unFsFileSize);
-    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testRecoverFromFlushMemTableError() throws Exception {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("insert into root.turbine1.d1(timestamp,s1,s2) values(1,1.1,2.2)");
+    }
+
+    // mock exception
+    TsFileProcessor[] tsFileProcessors =
+        StorageEngine.getInstance()
+            .getProcessorByDataRegionId(new PartialPath("root.turbine1"), 0)
+            .getWorkSequenceTsFileProcessors()
+            .toArray(new TsFileProcessor[0]);
+    Assert.assertEquals(1, tsFileProcessors.length);
+    IMemTable memTable = tsFileProcessors[0].getWorkMemTable();
+    memTable.clear();
+    memTable.addTextDataSize(1);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("flush");
+    }
+
+    IoTDBDescriptor.getInstance().getConfig().setReadOnly(false);
+    EnvironmentUtils.restartDaemon();
+
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select * from root.**");
+      assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          Assert.assertEquals("1", resultSet.getString(1));
+          Assert.assertEquals("1.1", resultSet.getString(2));
+          Assert.assertEquals("2.2", resultSet.getString(3));
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 01fa311687..8cac805ba2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -1191,6 +1191,22 @@ public class TsFileProcessor {
     }
   }
 
+  /** This method will synchronize the memTable and release its flushing resources */
+  private void syncReleaseFlushedMemTable(IMemTable memTable) {
+    synchronized (memTable) {
+      releaseFlushedMemTable(memTable);
+      memTable.notifyAll();
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "{}: {} released a memtable (signal={}), flushingMemtables size ={}",
+            storageGroupName,
+            tsFileResource.getTsFile().getName(),
+            memTable.isSignalMemTable(),
+            flushingMemTables.size());
+      }
+    }
+  }
+
   /**
    * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
    * the flush manager pool
@@ -1236,7 +1252,29 @@ public class TsFileProcessor {
                 tsFileResource.getTsFile().getName(),
                 e1);
           }
-          Thread.currentThread().interrupt();
+          // release resource
+          try {
+            syncReleaseFlushedMemTable(memTableToFlush);
+            // make sure no query will search this file
+            tsFileResource.setTimeIndex(config.getTimeIndexLevel().getTimeIndex());
+            // this callback method will register this empty tsfile into TsFileManager
+            for (CloseFileListener closeFileListener : closeFileListeners) {
+              closeFileListener.onClosed(this);
+            }
+            // close writer
+            writer.close();
+            writer = null;
+            synchronized (flushingMemTables) {
+              flushingMemTables.notifyAll();
+            }
+          } catch (Exception e1) {
+            logger.error(
+                "{}: {} Release resource meets error",
+                storageGroupName,
+                tsFileResource.getTsFile().getName(),
+                e1);
+          }
+          return;
         }
       }
     }
@@ -1278,19 +1316,9 @@ public class TsFileProcessor {
           tsFileResource.getTsFile().getName(),
           memTableToFlush.isSignalMemTable());
     }
+
     // for sync flush
-    synchronized (memTableToFlush) {
-      releaseFlushedMemTable(memTableToFlush);
-      memTableToFlush.notifyAll();
-      if (logger.isDebugEnabled()) {
-        logger.debug(
-            "{}: {} released a memtable (signal={}), flushingMemtables size ={}",
-            storageGroupName,
-            tsFileResource.getTsFile().getName(),
-            memTableToFlush.isSignalMemTable(),
-            flushingMemTables.size());
-      }
-    }
+    syncReleaseFlushedMemTable(memTableToFlush);
 
     if (shouldClose && flushingMemTables.isEmpty() && writer != null) {
       try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 7cabb92612..78253124b8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -75,42 +75,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
    * @throws IOException if write failed, or the file is broken but autoRepair==false.
    */
   public RestorableTsFileIOWriter(File file) throws IOException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("{} is opened.", file.getName());
-    }
-    this.file = file;
-    this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
-
-    // file doesn't exist
-    if (file.length() == 0) {
-      startFile();
-      crashed = true;
-      canWrite = true;
-      return;
-    }
-
-    if (file.exists()) {
-      try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
-
-        truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
-        minPlanIndex = reader.getMinPlanIndex();
-        maxPlanIndex = reader.getMaxPlanIndex();
-        if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
-          crashed = false;
-          canWrite = false;
-          out.close();
-        } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
-          out.close();
-          throw new NotCompatibleTsFileException(
-              String.format("%s is not in TsFile format.", file.getAbsolutePath()));
-        } else {
-          crashed = true;
-          canWrite = true;
-          // remove broken data
-          out.truncate(truncatedSize);
-        }
-      }
-    }
+    this(file, true);
   }
 
   public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {