You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/11/08 09:44:33 UTC

[iotdb] branch master updated: [IOTDB-1613] Recover mods file if a delete write modification failed (#4334)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c36f2d  [IOTDB-1613] Recover mods file if a delete write modification failed (#4334)
0c36f2d is described below

commit 0c36f2d7638f46080eceb3cfaadf3581c66f4afd
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon Nov 8 17:44:02 2021 +0800

    [IOTDB-1613] Recover mods file if a delete write modification failed (#4334)
---
 .../db/engine/modification/ModificationFile.java   |  10 +-
 .../io/LocalTextModificationAccessor.java          |  29 +-
 .../engine/modification/io/ModificationReader.java |   3 +-
 .../engine/modification/io/ModificationWriter.java |   4 +-
 .../modification/utils/TracedBufferedReader.java   | 419 +++++++++++++++++++++
 .../engine/storagegroup/StorageGroupProcessor.java |  10 +-
 .../io/LocalTextModificationAccessorTest.java      |  49 +++
 7 files changed, 502 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index a12b976..77a8431 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -49,6 +49,7 @@ public class ModificationFile implements AutoCloseable {
   public static final String FILE_SUFFIX = ".mods";
   public static final String COMPACTION_FILE_SUFFIX = ".compaction.mods";
 
+  // lazy loaded, set null when closed
   private List<Modification> modifications;
   private ModificationWriter writer;
   private ModificationReader reader;
@@ -90,8 +91,8 @@ public class ModificationFile implements AutoCloseable {
 
   public void abort() throws IOException {
     synchronized (this) {
-      if (!modifications.isEmpty()) {
-        writer.abort();
+      writer.abort();
+      if (modifications != null && !modifications.isEmpty()) {
         modifications.remove(modifications.size() - 1);
       }
     }
@@ -106,9 +107,10 @@ public class ModificationFile implements AutoCloseable {
    */
   public void write(Modification mod) throws IOException {
     synchronized (this) {
-      checkInit();
       writer.write(mod);
-      modifications.add(mod);
+      if (modifications != null) {
+        modifications.add(mod);
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
index 6160341..07b2d26 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.modification.io;
 
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.utils.TracedBufferedReader;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -28,9 +29,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
+import java.io.*;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -61,27 +60,43 @@ public class LocalTextModificationAccessor
 
   @Override
   public Collection<Modification> read() {
-    if (!FSFactoryProducer.getFSFactory().getFile(filePath).exists()) {
+    File file = FSFactoryProducer.getFSFactory().getFile(filePath);
+    if (!file.exists()) {
       logger.debug("No modification has been written to this file");
       return new ArrayList<>();
     }
 
     String line;
+    long truncatedSize = 0;
+    boolean crashed = false;
     List<Modification> modificationList = new ArrayList<>();
-    try (BufferedReader reader = FSFactoryProducer.getFSFactory().getBufferedReader(filePath)) {
+    try (TracedBufferedReader reader = new TracedBufferedReader(new FileReader(file))) {
       while ((line = reader.readLine()) != null) {
         if (line.equals(ABORT_MARK) && !modificationList.isEmpty()) {
           modificationList.remove(modificationList.size() - 1);
         } else {
           modificationList.add(decodeModification(line));
         }
+        truncatedSize = reader.position();
       }
     } catch (IOException e) {
+      crashed = true;
       logger.error(
-          "An error occurred when reading modifications, and the remaining modifications "
-              + "were ignored.",
+          "An error occurred when reading modifications, and the remaining modifications will be truncated to size {}.",
+          truncatedSize,
           e);
     }
+
+    if (crashed) {
+      try (FileOutputStream outputStream = new FileOutputStream(file, true)) {
+        outputStream.getChannel().truncate(truncatedSize);
+      } catch (FileNotFoundException e) {
+        logger.debug("No modification has been written to this file");
+      } catch (IOException e) {
+        logger.error(
+            "An error occurred when truncating modifications to size {}.", truncatedSize, e);
+      }
+    }
     return modificationList;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
index be1d457..df23037 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
@@ -28,7 +28,8 @@ import java.util.Collection;
 public interface ModificationReader {
 
   /**
-   * Read all modifications from a persistent medium.
+   * Read all modifications from a persistent medium. If the mods file is crashed, the redundant
+   * modifications will be truncated until the file is correct.
    *
    * @return a list of modifications contained the medium.
    */
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
index 2bef00d..64a5005 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
@@ -31,7 +31,7 @@ public interface ModificationWriter {
 
   /**
    * Write a new modification to the persistent medium. Notice that after calling write(), a
-   * fileWriter is opened,
+   * fileWriter is opened.
    *
    * @param mod the modification to be written.
    */
@@ -40,6 +40,6 @@ public interface ModificationWriter {
   /** Release resources like streams. */
   void close() throws IOException;
 
-  /** Abort last modification. */
+  /** Abort last modification. Notice that after calling abort(), a fileWriter is opened. */
   void abort() throws IOException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java
new file mode 100644
index 0000000..2dce0b2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Copied from {@link java.io.BufferedReader}, trace the read position by modifying the fill()
+ * method.
+ */
+public class TracedBufferedReader extends Reader {
+  private Reader in;
+
+  private char cb[];
+  private int nChars, nextChar;
+
+  private static final int INVALIDATED = -2;
+  private static final int UNMARKED = -1;
+  private int markedChar = UNMARKED;
+  private int readAheadLimit = 0; /* Valid only when markedChar > 0 */
+
+  /** If the next character is a line feed, skip it */
+  private boolean skipLF = false;
+
+  /** The skipLF flag when the mark was set */
+  private boolean markedSkipLF = false;
+
+  private static int defaultCharBufferSize = 8192;
+  private static int defaultExpectedLineLength = 80;
+
+  /** the total bytes number already filled into cb */
+  private long totalFilledBytesNum = 0;
+
+  /**
+   * Creates a buffering character-input stream that uses an input buffer of the specified size.
+   *
+   * @param in A Reader
+   * @param sz Input-buffer size
+   * @exception IllegalArgumentException If {@code sz <= 0}
+   */
+  public TracedBufferedReader(Reader in, int sz) {
+    super(in);
+    if (sz <= 0) throw new IllegalArgumentException("Buffer size <= 0");
+    this.in = in;
+    cb = new char[sz];
+    nextChar = nChars = 0;
+  }
+
+  /**
+   * Creates a buffering character-input stream that uses a default-sized input buffer.
+   *
+   * @param in A Reader
+   */
+  public TracedBufferedReader(Reader in) {
+    this(in, defaultCharBufferSize);
+  }
+
+  /** Checks to make sure that the stream has not been closed */
+  private void ensureOpen() throws IOException {
+    if (in == null) throw new IOException("Stream closed");
+  }
+
+  /** {@link BufferedReader#fill()} */
+  private void fill() throws IOException {
+    int dst;
+    if (markedChar <= UNMARKED) {
+      /* No mark */
+      dst = 0;
+    } else {
+      /* Marked */
+      int delta = nextChar - markedChar;
+      if (delta >= readAheadLimit) {
+        /* Gone past read-ahead limit: Invalidate mark */
+        markedChar = INVALIDATED;
+        readAheadLimit = 0;
+        dst = 0;
+      } else {
+        if (readAheadLimit <= cb.length) {
+          /* Shuffle in the current buffer */
+          System.arraycopy(cb, markedChar, cb, 0, delta);
+          markedChar = 0;
+          dst = delta;
+        } else {
+          /* Reallocate buffer to accommodate read-ahead limit */
+          char ncb[] = new char[readAheadLimit];
+          System.arraycopy(cb, markedChar, ncb, 0, delta);
+          cb = ncb;
+          markedChar = 0;
+          dst = delta;
+        }
+        nextChar = nChars = delta;
+      }
+    }
+
+    int n;
+    do {
+      n = in.read(cb, dst, cb.length - dst);
+    } while (n == 0);
+    if (n > 0) {
+      nChars = dst + n;
+      nextChar = dst;
+      totalFilledBytesNum = totalFilledBytesNum + n;
+    }
+  }
+
+  /** {@link BufferedReader#read()} */
+  public int read() throws IOException {
+    synchronized (lock) {
+      ensureOpen();
+      for (; ; ) {
+        if (nextChar >= nChars) {
+          fill();
+          if (nextChar >= nChars) return -1;
+        }
+        if (skipLF) {
+          skipLF = false;
+          if (cb[nextChar] == '\n') {
+            nextChar++;
+            continue;
+          }
+        }
+        return cb[nextChar++];
+      }
+    }
+  }
+
+  /** {@link BufferedReader#read1(char[], int, int)} */
+  private int read1(char[] cbuf, int off, int len) throws IOException {
+    if (nextChar >= nChars) {
+      /* If the requested length is at least as large as the buffer, and
+      if there is no mark/reset activity, and if line feeds are not
+      being skipped, do not bother to copy the characters into the
+      local buffer.  In this way buffered streams will cascade
+      harmlessly. */
+      if (len >= cb.length && markedChar <= UNMARKED && !skipLF) {
+        return in.read(cbuf, off, len);
+      }
+      fill();
+    }
+    if (nextChar >= nChars) return -1;
+    if (skipLF) {
+      skipLF = false;
+      if (cb[nextChar] == '\n') {
+        nextChar++;
+        if (nextChar >= nChars) fill();
+        if (nextChar >= nChars) return -1;
+      }
+    }
+    int n = Math.min(len, nChars - nextChar);
+    System.arraycopy(cb, nextChar, cbuf, off, n);
+    nextChar += n;
+    return n;
+  }
+
+  /** {@link BufferedReader#read(char[], int, int)} */
+  public int read(char cbuf[], int off, int len) throws IOException {
+    synchronized (lock) {
+      ensureOpen();
+      if ((off < 0)
+          || (off > cbuf.length)
+          || (len < 0)
+          || ((off + len) > cbuf.length)
+          || ((off + len) < 0)) {
+        throw new IndexOutOfBoundsException();
+      } else if (len == 0) {
+        return 0;
+      }
+
+      int n = read1(cbuf, off, len);
+      if (n <= 0) return n;
+      while ((n < len) && in.ready()) {
+        int n1 = read1(cbuf, off + n, len - n);
+        if (n1 <= 0) break;
+        n += n1;
+      }
+      return n;
+    }
+  }
+
+  /** {@link BufferedReader#readLine(boolean)} */
+  String readLine(boolean ignoreLF) throws IOException {
+    StringBuilder s = null;
+    int startChar;
+
+    synchronized (lock) {
+      ensureOpen();
+      boolean omitLF = ignoreLF || skipLF;
+
+      bufferLoop:
+      for (; ; ) {
+
+        if (nextChar >= nChars) fill();
+        if (nextChar >= nChars) {
+          /* EOF */
+          if (s != null && s.length() > 0) return s.toString();
+          else return null;
+        }
+        boolean eol = false;
+        char c = 0;
+        int i;
+
+        /* Skip a leftover '\n', if necessary */
+        if (omitLF && (cb[nextChar] == '\n')) nextChar++;
+        skipLF = false;
+        omitLF = false;
+
+        charLoop:
+        for (i = nextChar; i < nChars; i++) {
+          c = cb[i];
+          if ((c == '\n') || (c == '\r')) {
+            eol = true;
+            break charLoop;
+          }
+        }
+
+        startChar = nextChar;
+        nextChar = i;
+
+        if (eol) {
+          String str;
+          if (s == null) {
+            str = new String(cb, startChar, i - startChar);
+          } else {
+            s.append(cb, startChar, i - startChar);
+            str = s.toString();
+          }
+          nextChar++;
+          if (c == '\r') {
+            skipLF = true;
+            if (read() != -1) {
+              nextChar--;
+            }
+          }
+          return str;
+        }
+
+        if (s == null) s = new StringBuilder(defaultExpectedLineLength);
+        s.append(cb, startChar, i - startChar);
+      }
+    }
+  }
+
+  /** {@link BufferedReader#readLine()} */
+  public String readLine() throws IOException {
+    return readLine(false);
+  }
+
+  /** {@link BufferedReader#skip(long)} */
+  public long skip(long n) throws IOException {
+    if (n < 0L) {
+      throw new IllegalArgumentException("skip value is negative");
+    }
+    synchronized (lock) {
+      ensureOpen();
+      long r = n;
+      while (r > 0) {
+        if (nextChar >= nChars) fill();
+        if (nextChar >= nChars) /* EOF */ break;
+        if (skipLF) {
+          skipLF = false;
+          if (cb[nextChar] == '\n') {
+            nextChar++;
+          }
+        }
+        long d = nChars - nextChar;
+        if (r <= d) {
+          nextChar += r;
+          r = 0;
+          break;
+        } else {
+          r -= d;
+          nextChar = nChars;
+        }
+      }
+      return n - r;
+    }
+  }
+
+  /** {@link BufferedReader#ready()} */
+  public boolean ready() throws IOException {
+    synchronized (lock) {
+      ensureOpen();
+
+      /*
+       * If newline needs to be skipped and the next char to be read
+       * is a newline character, then just skip it right away.
+       */
+      if (skipLF) {
+        /* Note that in.ready() will return true if and only if the next
+         * read on the stream will not block.
+         */
+        if (nextChar >= nChars && in.ready()) {
+          fill();
+        }
+        if (nextChar < nChars) {
+          if (cb[nextChar] == '\n') nextChar++;
+          skipLF = false;
+        }
+      }
+      return (nextChar < nChars) || in.ready();
+    }
+  }
+
+  /** {@link BufferedReader#markSupported()} */
+  public boolean markSupported() {
+    return true;
+  }
+
+  /** {@link BufferedReader#mark(int)} */
+  public void mark(int readAheadLimit) throws IOException {
+    if (readAheadLimit < 0) {
+      throw new IllegalArgumentException("Read-ahead limit < 0");
+    }
+    synchronized (lock) {
+      ensureOpen();
+      this.readAheadLimit = readAheadLimit;
+      markedChar = nextChar;
+      markedSkipLF = skipLF;
+    }
+  }
+
+  /** {@link BufferedReader#reset()} */
+  public void reset() throws IOException {
+    synchronized (lock) {
+      ensureOpen();
+      if (markedChar < 0)
+        throw new IOException((markedChar == INVALIDATED) ? "Mark invalid" : "Stream not marked");
+      nextChar = markedChar;
+      skipLF = markedSkipLF;
+    }
+  }
+
+  /** {@link BufferedReader#close()} */
+  public void close() throws IOException {
+    synchronized (lock) {
+      if (in == null) return;
+      try {
+        in.close();
+      } finally {
+        in = null;
+        cb = null;
+      }
+    }
+  }
+
+  /** {@link BufferedReader#lines()} */
+  public Stream<String> lines() {
+    Iterator<String> iter =
+        new Iterator<String>() {
+          String nextLine = null;
+
+          @Override
+          public boolean hasNext() {
+            if (nextLine != null) {
+              return true;
+            } else {
+              try {
+                nextLine = readLine();
+                return (nextLine != null);
+              } catch (IOException e) {
+                throw new UncheckedIOException(e);
+              }
+            }
+          }
+
+          @Override
+          public String next() {
+            if (nextLine != null || hasNext()) {
+              String line = nextLine;
+              nextLine = null;
+              return line;
+            } else {
+              throw new NoSuchElementException();
+            }
+          }
+        };
+    return StreamSupport.stream(
+        Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED | Spliterator.NONNULL),
+        false);
+  }
+
+  /**
+   * Returns this reader's file position.
+   *
+   * @return This reader's file position, a non-negative integer counting the number of bytes from
+   *     the beginning of the file to the current position
+   */
+  public long position() {
+    // position = totalFilledBytesNum - lastFilledBytesNum + readOffsetInLastFilledBytes
+    // lastFilledBytesNum = nChars - dst, readOffsetInLastFilledBytes = nextChar - dst
+    return totalFilledBytesNum - nChars + nextChar;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f35a7d2..b4cebdb 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1939,14 +1939,6 @@ public class StorageGroupProcessor {
     try {
       Set<PartialPath> devicePaths = IoTDB.metaManager.getBelongedDevices(path);
       for (PartialPath device : devicePaths) {
-        Long lastUpdateTime = null;
-        for (Map<String, Long> latestTimeMap : latestTimeForEachDevice.values()) {
-          Long curTime = latestTimeMap.get(device.getFullPath());
-          if (curTime != null && (lastUpdateTime == null || lastUpdateTime < curTime)) {
-            lastUpdateTime = curTime;
-          }
-        }
-
         // delete Last cache record if necessary
         tryToDeleteLastCache(device, path, startTime, endTime);
       }
@@ -1975,6 +1967,8 @@ public class StorageGroupProcessor {
       // roll back
       for (ModificationFile modFile : updatedModFiles) {
         modFile.abort();
+        // remember to close mod file
+        modFile.close();
       }
       throw new IOException(e);
     } finally {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
index bfd4dc6..e7c81b3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
 
 import org.junit.Test;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -78,4 +80,51 @@ public class LocalTextModificationAccessorTest {
     Collection<Modification> modifications = accessor.read();
     assertEquals(new ArrayList<>(), modifications);
   }
+
+  @Test
+  public void readAndTruncate() {
+    String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
+    File file = new File(tempFileName);
+    if (file.exists()) {
+      file.delete();
+    }
+    Modification[] modifications =
+        new Modification[] {
+          new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
+          new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
+          new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
+          new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
+        };
+    try (LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(tempFileName);
+        BufferedWriter writer = new BufferedWriter(new FileWriter(tempFileName, true))) {
+      // write normal message
+      for (int i = 0; i < 2; i++) {
+        accessor.write(modifications[i]);
+      }
+      List<Modification> modificationList = (List<Modification>) accessor.read();
+      for (int i = 0; i < 2; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+      // write error message
+      long length = file.length();
+      writer.write("error");
+      writer.newLine();
+      writer.flush();
+      // write normal message & read
+      for (int i = 2; i < 4; i++) {
+        accessor.write(modifications[i]);
+      }
+      modificationList = (List<Modification>) accessor.read();
+      for (int i = 0; i < 2; i++) {
+        System.out.println(modificationList);
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+      // check truncated file
+      assertEquals(length, file.length());
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {
+      file.delete();
+    }
+  }
 }