You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/11/08 09:44:33 UTC
[iotdb] branch master updated: [IOTDB-1613] Recover mods file if a
delete write modification failed (#4334)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0c36f2d [IOTDB-1613] Recover mods file if a delete write modification failed (#4334)
0c36f2d is described below
commit 0c36f2d7638f46080eceb3cfaadf3581c66f4afd
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon Nov 8 17:44:02 2021 +0800
[IOTDB-1613] Recover mods file if a delete write modification failed (#4334)
---
.../db/engine/modification/ModificationFile.java | 10 +-
.../io/LocalTextModificationAccessor.java | 29 +-
.../engine/modification/io/ModificationReader.java | 3 +-
.../engine/modification/io/ModificationWriter.java | 4 +-
.../modification/utils/TracedBufferedReader.java | 419 +++++++++++++++++++++
.../engine/storagegroup/StorageGroupProcessor.java | 10 +-
.../io/LocalTextModificationAccessorTest.java | 49 +++
7 files changed, 502 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index a12b976..77a8431 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -49,6 +49,7 @@ public class ModificationFile implements AutoCloseable {
public static final String FILE_SUFFIX = ".mods";
public static final String COMPACTION_FILE_SUFFIX = ".compaction.mods";
+ // lazy loaded, set null when closed
private List<Modification> modifications;
private ModificationWriter writer;
private ModificationReader reader;
@@ -90,8 +91,8 @@ public class ModificationFile implements AutoCloseable {
public void abort() throws IOException {
synchronized (this) {
- if (!modifications.isEmpty()) {
- writer.abort();
+ writer.abort();
+ if (modifications != null && !modifications.isEmpty()) {
modifications.remove(modifications.size() - 1);
}
}
@@ -106,9 +107,10 @@ public class ModificationFile implements AutoCloseable {
*/
public void write(Modification mod) throws IOException {
synchronized (this) {
- checkInit();
writer.write(mod);
- modifications.add(mod);
+ if (modifications != null) {
+ modifications.add(mod);
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
index 6160341..07b2d26 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.modification.io;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.utils.TracedBufferedReader;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -28,9 +29,7 @@ import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.IOException;
+import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -61,27 +60,43 @@ public class LocalTextModificationAccessor
@Override
public Collection<Modification> read() {
- if (!FSFactoryProducer.getFSFactory().getFile(filePath).exists()) {
+ File file = FSFactoryProducer.getFSFactory().getFile(filePath);
+ if (!file.exists()) {
logger.debug("No modification has been written to this file");
return new ArrayList<>();
}
String line;
+ long truncatedSize = 0;
+ boolean crashed = false;
List<Modification> modificationList = new ArrayList<>();
- try (BufferedReader reader = FSFactoryProducer.getFSFactory().getBufferedReader(filePath)) {
+ try (TracedBufferedReader reader = new TracedBufferedReader(new FileReader(file))) {
while ((line = reader.readLine()) != null) {
if (line.equals(ABORT_MARK) && !modificationList.isEmpty()) {
modificationList.remove(modificationList.size() - 1);
} else {
modificationList.add(decodeModification(line));
}
+ truncatedSize = reader.position();
}
} catch (IOException e) {
+ crashed = true;
logger.error(
- "An error occurred when reading modifications, and the remaining modifications "
- + "were ignored.",
+ "An error occurred when reading modifications, and the remaining modifications will be truncated to size {}.",
+ truncatedSize,
e);
}
+
+ if (crashed) {
+ try (FileOutputStream outputStream = new FileOutputStream(file, true)) {
+ outputStream.getChannel().truncate(truncatedSize);
+ } catch (FileNotFoundException e) {
+ logger.debug("No modification has been written to this file");
+ } catch (IOException e) {
+ logger.error(
+ "An error occurred when truncating modifications to size {}.", truncatedSize, e);
+ }
+ }
return modificationList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
index be1d457..df23037 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
@@ -28,7 +28,8 @@ import java.util.Collection;
public interface ModificationReader {
/**
- * Read all modifications from a persistent medium.
+ * Read all modifications from a persistent medium. If the mods file is crashed, the redundant
+ * modifications will be truncated until the file is correct.
*
* @return a list of modifications contained the medium.
*/
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
index 2bef00d..64a5005 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
@@ -31,7 +31,7 @@ public interface ModificationWriter {
/**
* Write a new modification to the persistent medium. Notice that after calling write(), a
- * fileWriter is opened,
+ * fileWriter is opened.
*
* @param mod the modification to be written.
*/
@@ -40,6 +40,6 @@ public interface ModificationWriter {
/** Release resources like streams. */
void close() throws IOException;
- /** Abort last modification. */
+ /** Abort last modification. Notice that after calling abort(), a fileWriter is opened. */
void abort() throws IOException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java b/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java
new file mode 100644
index 0000000..2dce0b2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/modification/utils/TracedBufferedReader.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Copied from {@link java.io.BufferedReader}, trace the read position by modifying the fill()
+ * method.
+ */
+public class TracedBufferedReader extends Reader {
+ private Reader in;
+
+ private char cb[];
+ private int nChars, nextChar;
+
+ private static final int INVALIDATED = -2;
+ private static final int UNMARKED = -1;
+ private int markedChar = UNMARKED;
+ private int readAheadLimit = 0; /* Valid only when markedChar > 0 */
+
+ /** If the next character is a line feed, skip it */
+ private boolean skipLF = false;
+
+ /** The skipLF flag when the mark was set */
+ private boolean markedSkipLF = false;
+
+ private static int defaultCharBufferSize = 8192;
+ private static int defaultExpectedLineLength = 80;
+
+ /** the total bytes number already filled into cb */
+ private long totalFilledBytesNum = 0;
+
+ /**
+ * Creates a buffering character-input stream that uses an input buffer of the specified size.
+ *
+ * @param in A Reader
+ * @param sz Input-buffer size
+ * @exception IllegalArgumentException If {@code sz <= 0}
+ */
+ public TracedBufferedReader(Reader in, int sz) {
+ super(in);
+ if (sz <= 0) throw new IllegalArgumentException("Buffer size <= 0");
+ this.in = in;
+ cb = new char[sz];
+ nextChar = nChars = 0;
+ }
+
+ /**
+ * Creates a buffering character-input stream that uses a default-sized input buffer.
+ *
+ * @param in A Reader
+ */
+ public TracedBufferedReader(Reader in) {
+ this(in, defaultCharBufferSize);
+ }
+
+ /** Checks to make sure that the stream has not been closed */
+ private void ensureOpen() throws IOException {
+ if (in == null) throw new IOException("Stream closed");
+ }
+
+ /** {@link BufferedReader#fill()} */
+ private void fill() throws IOException {
+ int dst;
+ if (markedChar <= UNMARKED) {
+ /* No mark */
+ dst = 0;
+ } else {
+ /* Marked */
+ int delta = nextChar - markedChar;
+ if (delta >= readAheadLimit) {
+ /* Gone past read-ahead limit: Invalidate mark */
+ markedChar = INVALIDATED;
+ readAheadLimit = 0;
+ dst = 0;
+ } else {
+ if (readAheadLimit <= cb.length) {
+ /* Shuffle in the current buffer */
+ System.arraycopy(cb, markedChar, cb, 0, delta);
+ markedChar = 0;
+ dst = delta;
+ } else {
+ /* Reallocate buffer to accommodate read-ahead limit */
+ char ncb[] = new char[readAheadLimit];
+ System.arraycopy(cb, markedChar, ncb, 0, delta);
+ cb = ncb;
+ markedChar = 0;
+ dst = delta;
+ }
+ nextChar = nChars = delta;
+ }
+ }
+
+ int n;
+ do {
+ n = in.read(cb, dst, cb.length - dst);
+ } while (n == 0);
+ if (n > 0) {
+ nChars = dst + n;
+ nextChar = dst;
+ totalFilledBytesNum = totalFilledBytesNum + n;
+ }
+ }
+
+ /** {@link BufferedReader#read()} */
+ public int read() throws IOException {
+ synchronized (lock) {
+ ensureOpen();
+ for (; ; ) {
+ if (nextChar >= nChars) {
+ fill();
+ if (nextChar >= nChars) return -1;
+ }
+ if (skipLF) {
+ skipLF = false;
+ if (cb[nextChar] == '\n') {
+ nextChar++;
+ continue;
+ }
+ }
+ return cb[nextChar++];
+ }
+ }
+ }
+
+ /** {@link BufferedReader#read1(char[], int, int)} */
+ private int read1(char[] cbuf, int off, int len) throws IOException {
+ if (nextChar >= nChars) {
+ /* If the requested length is at least as large as the buffer, and
+ if there is no mark/reset activity, and if line feeds are not
+ being skipped, do not bother to copy the characters into the
+ local buffer. In this way buffered streams will cascade
+ harmlessly. */
+ if (len >= cb.length && markedChar <= UNMARKED && !skipLF) {
+ return in.read(cbuf, off, len);
+ }
+ fill();
+ }
+ if (nextChar >= nChars) return -1;
+ if (skipLF) {
+ skipLF = false;
+ if (cb[nextChar] == '\n') {
+ nextChar++;
+ if (nextChar >= nChars) fill();
+ if (nextChar >= nChars) return -1;
+ }
+ }
+ int n = Math.min(len, nChars - nextChar);
+ System.arraycopy(cb, nextChar, cbuf, off, n);
+ nextChar += n;
+ return n;
+ }
+
+ /** {@link BufferedReader#read(char[], int, int)} */
+ public int read(char cbuf[], int off, int len) throws IOException {
+ synchronized (lock) {
+ ensureOpen();
+ if ((off < 0)
+ || (off > cbuf.length)
+ || (len < 0)
+ || ((off + len) > cbuf.length)
+ || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ int n = read1(cbuf, off, len);
+ if (n <= 0) return n;
+ while ((n < len) && in.ready()) {
+ int n1 = read1(cbuf, off + n, len - n);
+ if (n1 <= 0) break;
+ n += n1;
+ }
+ return n;
+ }
+ }
+
+ /** {@link BufferedReader#readLine(boolean)} */
+ String readLine(boolean ignoreLF) throws IOException {
+ StringBuilder s = null;
+ int startChar;
+
+ synchronized (lock) {
+ ensureOpen();
+ boolean omitLF = ignoreLF || skipLF;
+
+ bufferLoop:
+ for (; ; ) {
+
+ if (nextChar >= nChars) fill();
+ if (nextChar >= nChars) {
+ /* EOF */
+ if (s != null && s.length() > 0) return s.toString();
+ else return null;
+ }
+ boolean eol = false;
+ char c = 0;
+ int i;
+
+ /* Skip a leftover '\n', if necessary */
+ if (omitLF && (cb[nextChar] == '\n')) nextChar++;
+ skipLF = false;
+ omitLF = false;
+
+ charLoop:
+ for (i = nextChar; i < nChars; i++) {
+ c = cb[i];
+ if ((c == '\n') || (c == '\r')) {
+ eol = true;
+ break charLoop;
+ }
+ }
+
+ startChar = nextChar;
+ nextChar = i;
+
+ if (eol) {
+ String str;
+ if (s == null) {
+ str = new String(cb, startChar, i - startChar);
+ } else {
+ s.append(cb, startChar, i - startChar);
+ str = s.toString();
+ }
+ nextChar++;
+ if (c == '\r') {
+ skipLF = true;
+ if (read() != -1) {
+ nextChar--;
+ }
+ }
+ return str;
+ }
+
+ if (s == null) s = new StringBuilder(defaultExpectedLineLength);
+ s.append(cb, startChar, i - startChar);
+ }
+ }
+ }
+
+ /** {@link BufferedReader#readLine()} */
+ public String readLine() throws IOException {
+ return readLine(false);
+ }
+
+ /** {@link BufferedReader#skip(long)} */
+ public long skip(long n) throws IOException {
+ if (n < 0L) {
+ throw new IllegalArgumentException("skip value is negative");
+ }
+ synchronized (lock) {
+ ensureOpen();
+ long r = n;
+ while (r > 0) {
+ if (nextChar >= nChars) fill();
+ if (nextChar >= nChars) /* EOF */ break;
+ if (skipLF) {
+ skipLF = false;
+ if (cb[nextChar] == '\n') {
+ nextChar++;
+ }
+ }
+ long d = nChars - nextChar;
+ if (r <= d) {
+ nextChar += r;
+ r = 0;
+ break;
+ } else {
+ r -= d;
+ nextChar = nChars;
+ }
+ }
+ return n - r;
+ }
+ }
+
+ /** {@link BufferedReader#ready()} */
+ public boolean ready() throws IOException {
+ synchronized (lock) {
+ ensureOpen();
+
+ /*
+ * If newline needs to be skipped and the next char to be read
+ * is a newline character, then just skip it right away.
+ */
+ if (skipLF) {
+ /* Note that in.ready() will return true if and only if the next
+ * read on the stream will not block.
+ */
+ if (nextChar >= nChars && in.ready()) {
+ fill();
+ }
+ if (nextChar < nChars) {
+ if (cb[nextChar] == '\n') nextChar++;
+ skipLF = false;
+ }
+ }
+ return (nextChar < nChars) || in.ready();
+ }
+ }
+
+ /** {@link BufferedReader#markSupported()} */
+ public boolean markSupported() {
+ return true;
+ }
+
+ /** {@link BufferedReader#mark(int)} */
+ public void mark(int readAheadLimit) throws IOException {
+ if (readAheadLimit < 0) {
+ throw new IllegalArgumentException("Read-ahead limit < 0");
+ }
+ synchronized (lock) {
+ ensureOpen();
+ this.readAheadLimit = readAheadLimit;
+ markedChar = nextChar;
+ markedSkipLF = skipLF;
+ }
+ }
+
+ /** {@link BufferedReader#reset()} */
+ public void reset() throws IOException {
+ synchronized (lock) {
+ ensureOpen();
+ if (markedChar < 0)
+ throw new IOException((markedChar == INVALIDATED) ? "Mark invalid" : "Stream not marked");
+ nextChar = markedChar;
+ skipLF = markedSkipLF;
+ }
+ }
+
+ /** {@link BufferedReader#close()} */
+ public void close() throws IOException {
+ synchronized (lock) {
+ if (in == null) return;
+ try {
+ in.close();
+ } finally {
+ in = null;
+ cb = null;
+ }
+ }
+ }
+
+ /** {@link BufferedReader#lines()} */
+ public Stream<String> lines() {
+ Iterator<String> iter =
+ new Iterator<String>() {
+ String nextLine = null;
+
+ @Override
+ public boolean hasNext() {
+ if (nextLine != null) {
+ return true;
+ } else {
+ try {
+ nextLine = readLine();
+ return (nextLine != null);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ @Override
+ public String next() {
+ if (nextLine != null || hasNext()) {
+ String line = nextLine;
+ nextLine = null;
+ return line;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+ };
+ return StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED | Spliterator.NONNULL),
+ false);
+ }
+
+ /**
+ * Returns this reader's file position.
+ *
+ * @return This reader's file position, a non-negative integer counting the number of bytes from
+ * the beginning of the file to the current position
+ */
+ public long position() {
+ // position = totalFilledBytesNum - lastFilledBytesNum + readOffsetInLastFilledBytes
+ // lastFilledBytesNum = nChars - dst, readOffsetInLastFilledBytes = nextChar - dst
+ return totalFilledBytesNum - nChars + nextChar;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f35a7d2..b4cebdb 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1939,14 +1939,6 @@ public class StorageGroupProcessor {
try {
Set<PartialPath> devicePaths = IoTDB.metaManager.getBelongedDevices(path);
for (PartialPath device : devicePaths) {
- Long lastUpdateTime = null;
- for (Map<String, Long> latestTimeMap : latestTimeForEachDevice.values()) {
- Long curTime = latestTimeMap.get(device.getFullPath());
- if (curTime != null && (lastUpdateTime == null || lastUpdateTime < curTime)) {
- lastUpdateTime = curTime;
- }
- }
-
// delete Last cache record if necessary
tryToDeleteLastCache(device, path, startTime, endTime);
}
@@ -1975,6 +1967,8 @@ public class StorageGroupProcessor {
// roll back
for (ModificationFile modFile : updatedModFiles) {
modFile.abort();
+ // remember to close mod file
+ modFile.close();
}
throw new IOException(e);
} finally {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
index bfd4dc6..e7c81b3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.junit.Test;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -78,4 +80,51 @@ public class LocalTextModificationAccessorTest {
Collection<Modification> modifications = accessor.read();
assertEquals(new ArrayList<>(), modifications);
}
+
+ @Test
+ public void readAndTruncate() {
+ String tempFileName = TestConstant.BASE_OUTPUT_PATH.concat("mod.temp");
+ File file = new File(tempFileName);
+ if (file.exists()) {
+ file.delete();
+ }
+ Modification[] modifications =
+ new Modification[] {
+ new Deletion(new PartialPath(new String[] {"d1", "s1"}), 1, 1),
+ new Deletion(new PartialPath(new String[] {"d1", "s2"}), 2, 2),
+ new Deletion(new PartialPath(new String[] {"d1", "s3"}), 3, 3),
+ new Deletion(new PartialPath(new String[] {"d1", "s4"}), 4, 4),
+ };
+ try (LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(tempFileName);
+ BufferedWriter writer = new BufferedWriter(new FileWriter(tempFileName, true))) {
+ // write normal message
+ for (int i = 0; i < 2; i++) {
+ accessor.write(modifications[i]);
+ }
+ List<Modification> modificationList = (List<Modification>) accessor.read();
+ for (int i = 0; i < 2; i++) {
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+ // write error message
+ long length = file.length();
+ writer.write("error");
+ writer.newLine();
+ writer.flush();
+ // write normal message & read
+ for (int i = 2; i < 4; i++) {
+ accessor.write(modifications[i]);
+ }
+ modificationList = (List<Modification>) accessor.read();
+ for (int i = 0; i < 2; i++) {
+ System.out.println(modificationList);
+ assertEquals(modifications[i], modificationList.get(i));
+ }
+ // check truncated file
+ assertEquals(length, file.length());
+ } catch (IOException e) {
+ fail(e.getMessage());
+ } finally {
+ file.delete();
+ }
+ }
}