You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/07/15 01:48:39 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-3618] StorageEngine failed to recover: / by zero (#6672)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 5680b42e16 [To rel/0.13][IOTDB-3618] StorageEngine failed to recover: / by zero (#6672)
5680b42e16 is described below
commit 5680b42e16978d0533a24cd020b8499c34edda4b
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Jul 15 09:48:33 2022 +0800
[To rel/0.13][IOTDB-3618] StorageEngine failed to recover: / by zero (#6672)
---
.../iotdb/db/qp/physical/crud/InsertPlan.java | 9 +++
.../iotdb/db/writelog/recover/LogReplayer.java | 21 ++----
.../iotdb/db/writelog/recover/LogReplayerTest.java | 10 ---
.../db/writelog/recover/SeqTsFileRecoverTest.java | 87 ++++++++++++++++++++++
4 files changed, 101 insertions(+), 26 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index d1943ab576..fb47d0befd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -118,6 +118,15 @@ public abstract class InsertPlan extends PhysicalPlan {
return failedIndices == null ? Collections.emptyList() : failedIndices;
}
+ public boolean hasValidMeasurements() {
+ for (Object o : measurements) {
+ if (o != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public boolean isAligned() {
return isAligned;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 313eff12c3..9d410ad048 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -74,9 +73,6 @@ public class LogReplayer {
// only unsequence file tolerates duplicated data
private boolean sequence;
- private Map<String, Long> tempStartTimeMap = new HashMap<>();
- private Map<String, Long> tempEndTimeMap = new HashMap<>();
-
public LogReplayer(
String logNodePrefix,
String insertFilePath,
@@ -163,14 +159,15 @@ public class LogReplayer {
private void replayInsert(
InsertPlan plan, VirtualStorageGroupProcessor virtualStorageGroupProcessor)
throws WriteProcessException, QueryProcessException {
+ if (!plan.hasValidMeasurements()) {
+ return;
+ }
if (currentTsFileResource != null) {
- long minTime, maxTime;
+ long minTime;
if (plan instanceof InsertRowPlan) {
minTime = ((InsertRowPlan) plan).getTime();
- maxTime = ((InsertRowPlan) plan).getTime();
} else {
- minTime = ((InsertTabletPlan) plan).getMinTime();
- maxTime = ((InsertTabletPlan) plan).getMaxTime();
+ minTime = plan.getMinTime();
}
String deviceId =
plan.isAligned()
@@ -181,14 +178,6 @@ public class LogReplayer {
if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTime && sequence) {
return;
}
- Long startTime = tempStartTimeMap.get(deviceId);
- if (startTime == null || startTime > minTime) {
- tempStartTimeMap.put(deviceId, minTime);
- }
- Long endTime = tempEndTimeMap.get(deviceId);
- if (endTime == null || endTime < maxTime) {
- tempEndTimeMap.put(deviceId, maxTime);
- }
}
plan.setMeasurementMNodes(new IMeasurementMNode[plan.getMeasurements().length]);
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 618eb05af7..4d31752c27 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -407,19 +407,9 @@ public class LogReplayerTest {
((Binary[]) columns[4])[(int) r] = Binary.valueOf(r + "");
}
- // BitMap[] bitMaps = new BitMap[dataTypes.size()];
- // for (int i = 0; i < dataTypes.size(); i++) {
- // if (bitMaps[i] == null) {
- // bitMaps[i] = new BitMap(times.length);
- // }
- // // mark value of time=99 as null
- // bitMaps[i].mark(99);
- // }
-
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
- // insertTabletPlan.setBitMaps(bitMaps);
return insertTabletPlan;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index d467a376c1..02716dfaf3 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.MmapUtil;
@@ -388,6 +389,66 @@ public class SeqTsFileRecoverTest {
resource = new TsFileResource(tsF);
}
+ /**
+ * Prepare WALNode that only contains InsertRowPlan/InsertTabletPlan with null values. This type
+ * of physical plan will be generated when inserting mismatched type data.
+ */
+ private void prepareNullInsertRowPlan() throws Exception {
+ if (!tsF.exists()) {
+ tsF.createNewFile();
+ }
+ node =
+ MultiFileLogNodeManager.getInstance()
+ .getNode(
+ logNodePrefix + tsF.getName(),
+ () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] =
+ ByteBuffer.allocateDirect(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] =
+ ByteBuffer.allocateDirect(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ });
+ IoTDB.metaManager.createTimeseries(
+ new PartialPath("root.sg.device1.sensor1"),
+ TSDataType.INT64,
+ TSEncoding.PLAIN,
+ TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ Collections.emptyMap());
+ InsertRowPlan insertRowPlan =
+ new InsertRowPlan(
+ new PartialPath("root.sg.device1"),
+ 50,
+ new String[] {"sensor1"},
+ new TSDataType[] {TSDataType.INT64},
+ new String[] {"1"});
+ insertRowPlan.markFailedMeasurementInsertion(0, new Exception());
+ node.write(insertRowPlan);
+
+ InsertTabletPlan insertTabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.sg.device1"),
+ new String[] {"sensor1"},
+ Collections.singletonList(TSDataType.INT64.ordinal()));
+ long[] times = new long[1];
+ Object[] columns = new Object[1];
+ columns[0] = new long[1];
+ times[0] = 1;
+ columns[0] = 1;
+ insertTabletPlan.setTimes(times);
+ insertTabletPlan.setColumns(columns);
+ insertTabletPlan.setRowCount(times.length);
+ insertTabletPlan.setStart(0);
+ insertTabletPlan.setEnd(1);
+ insertTabletPlan.markFailedMeasurementInsertion(0, new Exception());
+ node.write(insertTabletPlan);
+
+ node.notifyStartFlush();
+ resource = new TsFileResource(tsF);
+ }
+
@Test
public void testNonLastRecovery()
throws StorageGroupProcessorException, IOException, MetadataException, WriteProcessException {
@@ -562,4 +623,30 @@ public class SeqTsFileRecoverTest {
assertEquals(Long.MAX_VALUE, resource.getStartTime("root.sg.device4"));
assertEquals(Long.MIN_VALUE, resource.getEndTime("root.sg.device4"));
}
+
+ @Test
+ public void testRecoverNullInsertRowPlan() throws Exception {
+ prepareNullInsertRowPlan();
+ TsFileRecoverPerformer performer =
+ new TsFileRecoverPerformer(logNodePrefix, resource, false, true, null);
+ RestorableTsFileIOWriter writer =
+ performer.recover(
+ true,
+ () -> {
+ ByteBuffer[] buffers = new ByteBuffer[2];
+ buffers[0] =
+ ByteBuffer.allocateDirect(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ buffers[1] =
+ ByteBuffer.allocateDirect(
+ IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2);
+ return buffers;
+ },
+ (ByteBuffer[] array) -> {
+ for (ByteBuffer byteBuffer : array) {
+ MmapUtil.clean((MappedByteBuffer) byteBuffer);
+ }
+ });
+ Assert.assertEquals(0, resource.getDevices().size());
+ }
}