You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/02/21 07:08:59 UTC
[incubator-iotdb] branch master updated: Fix add future for flush
operation (#43)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b534412 Fix add future for flush operation (#43)
b534412 is described below
commit b534412aba1cf4143995e578c593aae7402bb190
Author: Xiangdong Huang <hx...@qq.com>
AuthorDate: Thu Feb 21 15:08:55 2019 +0800
Fix add future for flush operation (#43)
* add Future for async flushing memtable, replace FlushStatus by Future
* using flushMemTable!=null to determine whehter a flushTask is running rather than using future.isDone()
* replace assertEquals with assertTrue or assertFalse; add logger.error for some exception; change the time unit of lastFlushTime as nanoTime
---
.../java/org/apache/iotdb/db/engine/Processor.java | 8 +-
.../engine/bufferwrite/BufferWriteProcessor.java | 181 +++++++++---------
.../db/engine/filenode/FileNodeFlushFuture.java | 91 +++++++++
.../iotdb/db/engine/filenode/FileNodeManager.java | 4 +-
.../db/engine/filenode/FileNodeProcessor.java | 12 +-
.../db/engine/overflow/io/OverflowProcessor.java | 211 +++++++++++----------
.../apache/iotdb/db/engine/pool/FlushManager.java | 10 +-
.../apache/iotdb/db/qp/constant/DatetimeUtils.java | 6 +
.../ImmediateFuture.java} | 43 +++--
.../org/apache/iotdb/db/engine/ProcessorTest.java | 7 +-
.../bufferwrite/BufferWriteProcessorNewTest.java | 37 ++--
.../bufferwrite/BufferWriteProcessorTest.java | 48 +++--
.../engine/overflow/io/OverflowProcessorTest.java | 1 -
13 files changed, 421 insertions(+), 238 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 57e1523..cba7463 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine;
import java.io.IOException;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
@@ -170,7 +171,12 @@ public abstract class Processor {
*/
public abstract boolean canBeClosed();
- public abstract boolean flush() throws IOException;
+ /**
+ * call flush operation asynchronously
+ * @return a future that returns true if successfully, otherwise false.
+ * @throws IOException
+ */
+ public abstract Future<Boolean> flush() throws IOException;
/**
* Close the processor.<br>
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 13dcf57..e2cf818 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -20,10 +20,9 @@ package org.apache.iotdb.db.engine.bufferwrite;
import java.io.File;
import java.io.IOException;
-import java.time.Instant;
-import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -39,9 +38,10 @@ import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.pool.FlushManager;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.utils.FlushStatus;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BufferWriteProcessorException;
+import org.apache.iotdb.db.qp.constant.DatetimeUtils;
+import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -60,8 +60,7 @@ public class BufferWriteProcessor extends Processor {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessor.class);
private RestorableTsFileIOWriter writer;
private FileSchema fileSchema;
- private volatile FlushStatus flushStatus = new FlushStatus();
- private volatile boolean isFlush;
+ private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
private ReentrantLock flushQueryLock = new ReentrantLock();
private AtomicLong memSize = new AtomicLong();
private long memThreshold = TSFileDescriptor.getInstance().getConfig().groupSizeInByte;
@@ -71,6 +70,7 @@ public class BufferWriteProcessor extends Processor {
private Action bufferwriteCloseAction;
private Action filenodeFlushAction;
+ //lastFlushTime time unit: nanosecond
private long lastFlushTime = -1;
private long valueCount = 0;
@@ -224,7 +224,7 @@ public class BufferWriteProcessor extends Processor {
flushQueryLock.lock();
try {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
- if (isFlush) {
+ if (flushMemTable != null) {
memSeriesLazyMerger.addMemSeries(flushMemTable.query(deviceId, measurementId, dataType));
}
memSeriesLazyMerger.addMemSeries(workMemTable.query(deviceId, measurementId, dataType));
@@ -244,7 +244,6 @@ public class BufferWriteProcessor extends Processor {
workMemTable = new PrimitiveMemTable();
}
} finally {
- isFlush = true;
flushQueryLock.unlock();
}
}
@@ -256,15 +255,24 @@ public class BufferWriteProcessor extends Processor {
flushMemTable = null;
writer.appendMetadata();
} finally {
- isFlush = false;
flushQueryLock.unlock();
}
}
- private void flushOperation(String flushFunction, long version) {
+
+ /**
+ * the caller mast guarantee no other concurrent caller entering this function.
+ *
+ * @param displayMessage message that will appear in system log.
+ * @param version the operation version that will tagged on the to be flushed memtable
+ * (i.e., ChunkGroup)
+ * @return true if successfully.
+ */
+ private boolean flushTask(String displayMessage, long version) {
+ boolean result;
long flushStartTime = System.currentTimeMillis();
LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
- flushFunction);
+ displayMessage);
try {
if (flushMemTable != null && !flushMemTable.isEmpty()) {
// flush data
@@ -278,64 +286,56 @@ public class BufferWriteProcessor extends Processor {
if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
logNode.notifyEndFlush(null);
}
- } catch (IOException e) {
- LOGGER.error("The bufferwrite processor {} failed to flush {}.", getProcessorName(),
- flushFunction, e);
+ result = true;
} catch (Exception e) {
LOGGER.error(
"The bufferwrite processor {} failed to flush {}, when calling the filenodeFlushAction.",
- getProcessorName(), flushFunction, e);
+ getProcessorName(), displayMessage, e);
+ result = false;
} finally {
- synchronized (flushStatus) {
- flushStatus.setUnFlushing();
- switchFlushToWork();
- flushStatus.notifyAll();
- LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(),
- flushFunction);
- }
+ switchFlushToWork();
+ LOGGER.info("The bufferwrite processor {} ends flushing {}.", getProcessorName(),
+ displayMessage);
+ }
+ if (LOGGER.isInfoEnabled()) {
+ long flushEndTime = System.currentTimeMillis();
+ LOGGER.info(
+ "The bufferwrite processor {} flush {}, start time is {}, flush end time is {}, "
+ + "flush time consumption is {}ms",
+ getProcessorName(), displayMessage,
+ DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
+ flushEndTime - flushStartTime);
}
- long flushEndTime = System.currentTimeMillis();
- long flushInterval = flushEndTime - flushStartTime;
- ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "The bufferwrite processor {} flush {}, start time is {}, flush end time is {}, "
- + "flush time consumption is {}ms",
- getProcessorName(), flushFunction, startDateTime, endDateTime, flushInterval);
+ return result;
}
- private Future<?> flush(boolean synchronization) throws IOException {
+ // keyword synchronized is added in this method, so that only one flush task can be submitted now.
+ @Override
+ public synchronized Future<Boolean> flush() throws IOException {
// statistic information for flush
if (lastFlushTime > 0) {
- long thisFlushTime = System.currentTimeMillis();
- long flushTimeInterval = thisFlushTime - lastFlushTime;
- ZonedDateTime lastDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastFlushTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime thisDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisFlushTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "The bufferwrite processor {}: last flush time is {}, this flush time is {}, "
- + "flush time interval is {}s",
- getProcessorName(), lastDateTime, thisDateTime, flushTimeInterval / 1000);
+ if (LOGGER.isInfoEnabled()) {
+ long thisFlushTime = System.currentTimeMillis();
+ LOGGER.info(
+ "The bufferwrite processor {}: last flush time is {}, this flush time is {}, "
+ + "flush time interval is {}s", getProcessorName(),
+ DatetimeUtils.convertMillsecondToZonedDateTime(lastFlushTime / 1000),
+ DatetimeUtils.convertMillsecondToZonedDateTime(thisFlushTime),
+ (thisFlushTime - lastFlushTime / 1000) / 1000);
+ }
}
- lastFlushTime = System.currentTimeMillis();
+ lastFlushTime = System.nanoTime();
// check value count
if (valueCount > 0) {
// waiting for the end of last flush operation.
- synchronized (flushStatus) {
- while (flushStatus.isFlushing()) {
- try {
- flushStatus.wait();
- } catch (InterruptedException e) {
- LOGGER.error(
- "Encounter an interrupt error when waitting for the flushing, "
- + "the bufferwrite processor is {}.",
- getProcessorName(), e);
- Thread.currentThread().interrupt();
- }
- }
+ try {
+ flushFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Encounter an interrupt error when waitting for the flushing, "
+ + "the bufferwrite processor is {}.",
+ getProcessorName(), e);
+ Thread.currentThread().interrupt();
}
// update the lastUpdatetime, prepare for flush
try {
@@ -348,26 +348,17 @@ public class BufferWriteProcessor extends Processor {
logNode.notifyStartFlush();
}
valueCount = 0;
- flushStatus.setFlushing();
switchWorkToFlush();
long version = versionController.nextVersion();
BasicMemController.getInstance().reportFree(this, memSize.get());
memSize.set(0);
// switch
- if (synchronization) {
- flushOperation("synchronously", version);
- } else {
- FlushManager.getInstance().submit(() -> flushOperation("asynchronously", version));
- }
+ flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+ version));
+ } else {
+ flushFuture = new ImmediateFuture<>(true);
}
- // TODO return a meaningful Future
- return null;
- }
-
- @Override
- public boolean flush() throws IOException {
- flush(false);
- return false;
+ return flushFuture;
}
@Override
@@ -379,8 +370,8 @@ public class BufferWriteProcessor extends Processor {
public void close() throws BufferWriteProcessorException {
try {
long closeStartTime = System.currentTimeMillis();
- // flush data
- flush(true);
+ // flush data and wait for finishing flush
+ flush().get();
// end file
writer.endFile(fileSchema);
// update the IntervalFile for interval list
@@ -388,16 +379,16 @@ public class BufferWriteProcessor extends Processor {
// flush the changed information for filenode
filenodeFlushAction.act();
// delete the restore for this bufferwrite processor
- long closeEndTime = System.currentTimeMillis();
- long closeInterval = closeEndTime - closeStartTime;
- ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeEndTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "Close bufferwrite processor {}, the file name is {}, start time is {}, end time is {}, "
- + "time consumption is {}ms",
- getProcessorName(), fileName, startDateTime, endDateTime, closeInterval);
+ if (LOGGER.isInfoEnabled()) {
+ long closeEndTime = System.currentTimeMillis();
+ LOGGER.info(
+ "Close bufferwrite processor {}, the file name is {}, start time is {}, end time is {}, "
+ + "time consumption is {}ms",
+ getProcessorName(), fileName,
+ DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
+ closeEndTime - closeStartTime);
+ }
} catch (IOException e) {
LOGGER.error("Close the bufferwrite processor error, the bufferwrite is {}.",
getProcessorName(), e);
@@ -420,9 +411,13 @@ public class BufferWriteProcessor extends Processor {
* @return True if flushing
*/
public boolean isFlush() {
- synchronized (flushStatus) {
- return flushStatus.isFlushing();
- }
+ // starting a flush task has two steps: set the flushMemtable, and then set the flushFuture
+ // So, the following case exists: flushMemtable != null but flushFuture is done (because the
+ // flushFuture refers to the last finished flush.
+ // And, the following case exists,too: flushMemtable == null, but flushFuture is not done.
+ // (flushTask() is not finished, but switchToWork() has done)
+ // So, checking flushMemTable is more meaningful than flushFuture.isDone().
+ return flushMemTable != null;
}
/**
@@ -507,6 +502,22 @@ public class BufferWriteProcessor extends Processor {
}
/**
+ * used for test. We can know when the flush() is called.
+ * @return the last flush() time. Time unit: nanosecond.
+ */
+ public long getLastFlushTime() {
+ return lastFlushTime;
+ }
+
+ /**
+ * used for test. We can block to wait for finishing flushing.
+ * @return the future of the flush() task.
+ */
+ public Future<Boolean> getFlushFuture() {
+ return flushFuture;
+ }
+
+ /**
* Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
* Delete data in both working MemTable and flushing MemTable.
*
@@ -516,7 +527,7 @@ public class BufferWriteProcessor extends Processor {
*/
public void delete(String deviceId, String measurementId, long timestamp) {
workMemTable.delete(deviceId, measurementId, timestamp);
- if (isFlush) {
+ if (isFlush()) {
// flushing MemTable cannot be directly modified since another thread is reading it
flushMemTable = flushMemTable.copy();
flushMemTable.delete(deviceId, measurementId, timestamp);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java
new file mode 100644
index 0000000..660c949
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeFlushFuture.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.filenode;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.iotdb.db.utils.ImmediateFuture;
+
+public class FileNodeFlushFuture implements Future<Boolean> {
+ Future<Boolean> bufferWriteFlushFuture;
+ Future<Boolean> overflowFlushFuture;
+ boolean hasOverflowFlushTask;
+
+ public FileNodeFlushFuture(Future<Boolean> bufferWriteFlushFuture, Future<Boolean> overflowFlushFuture){
+ if(bufferWriteFlushFuture != null) {
+ this.bufferWriteFlushFuture = bufferWriteFlushFuture;
+ } else {
+ this.bufferWriteFlushFuture = new ImmediateFuture<>(true);
+ }
+ if(overflowFlushFuture !=null) {
+ this.overflowFlushFuture = overflowFlushFuture;
+ hasOverflowFlushTask = true;
+ } else {
+ this.overflowFlushFuture = new ImmediateFuture<>(true);
+ hasOverflowFlushTask = false;
+ }
+ }
+
+ /**
+ * @param mayInterruptIfRunning true if the thread executing this task should be interrupted;
+ * otherwise, in-progress tasks are allowed to complete
+ * @return true if both of the two sub-future are canceled successfully.
+ * @see Future#cancel(boolean) The difference is that this Future consists of two sub-Futures. If
+ * the first sub-future is canceled successfully but the second sub-future fails, the result is
+ * false.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean result = bufferWriteFlushFuture.cancel(mayInterruptIfRunning);
+ result = result & overflowFlushFuture.cancel(mayInterruptIfRunning);
+ return result;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return bufferWriteFlushFuture.isCancelled() && overflowFlushFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return bufferWriteFlushFuture.isDone() && overflowFlushFuture.isDone();
+ }
+
+ @Override
+ public Boolean get() throws InterruptedException, ExecutionException {
+ boolean result = bufferWriteFlushFuture.get();
+ result = result & overflowFlushFuture.get();
+ return result;
+ }
+
+ @Override
+ public Boolean get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ boolean result = bufferWriteFlushFuture.get(timeout, unit);
+ result = result && overflowFlushFuture.get(timeout, unit);
+ return result;
+ }
+
+ public boolean isHasOverflowFlushTask() {
+ return hasOverflowFlushTask;
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 7f6de86..c2f4126 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -1067,7 +1067,7 @@ public class FileNodeManager implements IStatistic, IService {
continue;
}
try {
- boolean isMerge = processor.flush();
+ boolean isMerge = processor.flush().isHasOverflowFlushTask();
if (isMerge) {
processor.submitToMerge();
}
@@ -1093,7 +1093,7 @@ public class FileNodeManager implements IStatistic, IService {
}
processor.writeLock();
try {
- boolean isMerge = processor.flush();
+ boolean isMerge = processor.flush().isHasOverflowFlushTask();
if (isMerge) {
processor.submitToMerge();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index a4fe3ee..b0921f3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -928,7 +928,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
* @return null -can't submit the merge task, because this filenode is not overflowed or it is
* merging now. Future - submit the merge task successfully.
*/
- public Future submitToMerge() {
+ Future submitToMerge() {
ZoneId zoneId = IoTDBDescriptor.getInstance().getConfig().getZoneID();
if (lastMergeTime > 0) {
long thisMergeTime = System.currentTimeMillis();
@@ -1695,14 +1695,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
}
@Override
- public boolean flush() throws IOException {
+ public FileNodeFlushFuture flush() throws IOException {
+ Future<Boolean> bufferWriteFlushFuture = null;
+ Future<Boolean> overflowFlushFuture = null;
if (bufferWriteProcessor != null) {
- bufferWriteProcessor.flush();
+ bufferWriteFlushFuture = bufferWriteProcessor.flush();
}
if (overflowProcessor != null) {
- return overflowProcessor.flush();
+ overflowFlushFuture = overflowProcessor.flush();
}
- return false;
+ return new FileNodeFlushFuture(bufferWriteFlushFuture, overflowFlushFuture);
}
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 01801d9..331fa33 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -27,10 +27,10 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -47,15 +47,15 @@ import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.utils.FlushStatus;
-import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.qp.constant.DatetimeUtils;
+import org.apache.iotdb.db.utils.ImmediateFuture;
+import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
@@ -70,14 +70,13 @@ public class OverflowProcessor extends Processor {
private static final Logger LOGGER = LoggerFactory.getLogger(OverflowProcessor.class);
private static final IoTDBConfig TsFileDBConf = IoTDBDescriptor.getInstance().getConfig();
- private static final TSFileConfig TsFileConf = TSFileDescriptor.getInstance().getConfig();
private OverflowResource workResource;
private OverflowResource mergeResource;
private OverflowSupport workSupport;
private OverflowSupport flushSupport;
- private volatile FlushStatus flushStatus = new FlushStatus();
+ private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
private volatile boolean isMerge;
private int valueCount;
private String parentPath;
@@ -243,7 +242,7 @@ public class OverflowProcessor extends Processor {
List<ModificationFile> updatedModFiles) throws IOException {
workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
workSupport.delete(deviceId, measurementId, timestamp, false);
- if (flushStatus.isFlushing()) {
+ if (isFlush()) {
mergeResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
flushSupport.delete(deviceId, measurementId, timestamp, true);
}
@@ -300,15 +299,20 @@ public class OverflowProcessor extends Processor {
TSDataType dataType) {
MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
- if (flushStatus.isFlushing()) {
+ queryFlushLock.lock();
+ try {
+ if (flushSupport != null && isFlush()) {
+ memSeriesLazyMerger
+ .addMemSeries(
+ flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType));
+ }
memSeriesLazyMerger
- .addMemSeries(
- flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType));
+ .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
+ dataType));
+ return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
+ } finally {
+ queryFlushLock.unlock();
}
- memSeriesLazyMerger
- .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
- dataType));
- return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
}
/**
@@ -416,17 +420,16 @@ public class OverflowProcessor extends Processor {
}
public boolean isFlush() {
- synchronized (flushStatus) {
- return flushStatus.isFlushing();
- }
+ //see BufferWriteProcess.isFlush()
+ return flushSupport != null;
}
- private void flushOperation(String flushFunction) {
+ private boolean flushTask(String displayMessage) {
+ boolean result;
long flushStartTime = System.currentTimeMillis();
try {
- LOGGER
- .info("The overflow processor {} starts flushing {}.", getProcessorName(),
- flushFunction);
+ LOGGER.info("The overflow processor {} starts flushing {}.", getProcessorName(),
+ displayMessage);
// flush data
workResource
.flush(fileSchema, flushSupport.getMemTabale(),
@@ -436,35 +439,37 @@ public class OverflowProcessor extends Processor {
if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
logNode.notifyEndFlush(null);
}
+ result = true;
} catch (IOException e) {
LOGGER.error("Flush overflow processor {} rowgroup to file error in {}. Thread {} exits.",
- getProcessorName(), flushFunction, Thread.currentThread().getName(), e);
+ getProcessorName(), displayMessage, Thread.currentThread().getName(), e);
+ result = false;
} catch (Exception e) {
LOGGER.error("FilenodeFlushAction action failed. Thread {} exits.",
Thread.currentThread().getName(), e);
+ result = false;
} finally {
- synchronized (flushStatus) {
- flushStatus.setUnFlushing();
// switch from flush to work.
switchFlushToWork();
- flushStatus.notifyAll();
- }
}
// log flush time
- LOGGER.info("The overflow processor {} ends flushing {}.", getProcessorName(), flushFunction);
- long flushEndTime = System.currentTimeMillis();
- long timeInterval = flushEndTime - flushStartTime;
- ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID());
- LOGGER.info(
- "The overflow processor {} flush {}, start time is {}, flush end time is {}," +
- " time consumption is {}ms",
- getProcessorName(), flushFunction, startDateTime, endDateTime, timeInterval);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER
+ .info("The overflow processor {} ends flushing {}.", getProcessorName(), displayMessage);
+ long flushEndTime = System.currentTimeMillis();
+ LOGGER.info(
+ "The overflow processor {} flush {}, start time is {}, flush end time is {}," +
+ " time consumption is {}ms",
+ getProcessorName(), displayMessage,
+ DatetimeUtils.convertMillsecondToZonedDateTime(flushStartTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(flushEndTime),
+ flushEndTime - flushStartTime);
+ }
+ return result;
}
- private Future<?> flush(boolean synchronization) throws OverflowProcessorException {
+ @Override
+ public synchronized Future<Boolean> flush() throws IOException {
// statistic information for flush
if (lastFlushTime > 0) {
long thisFLushTime = System.currentTimeMillis();
@@ -481,21 +486,20 @@ public class OverflowProcessor extends Processor {
lastFlushTime = System.currentTimeMillis();
// value count
if (valueCount > 0) {
- synchronized (flushStatus) {
- while (flushStatus.isFlushing()) {
- try {
- flushStatus.wait();
- } catch (InterruptedException e) {
- LOGGER.error("Waiting the flushstate error in flush row group to store.", e);
- }
- }
+ try {
+ flushFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Encounter an interrupt error when waitting for the flushing, "
+ + "the bufferwrite processor is {}.",
+ getProcessorName(), e);
+ Thread.currentThread().interrupt();
}
try {
// backup newIntervalFile list and emptyIntervalFileNode
overflowFlushAction.act();
} catch (Exception e) {
LOGGER.error("Flush the overflow rowGroup to file faied, when overflowFlushAction act");
- throw new OverflowProcessorException(e);
+ throw new IOException(e);
}
if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
@@ -510,30 +514,14 @@ public class OverflowProcessor extends Processor {
memSize.set(0);
valueCount = 0;
// switch from work to flush
- flushStatus.setFlushing();
switchWorkToFlush();
- if (synchronization) {
- flushOperation("synchronously");
- } else {
- FlushManager.getInstance().submit(new Runnable() {
- @Override
- public void run() {
- flushOperation("asynchronously");
- }
- });
- }
+ flushFuture = FlushManager.getInstance().submit( () ->
+ flushTask("asynchronously"));
+ } else {
+ flushFuture = new ImmediateFuture(true);
}
- return null;
- }
+ return flushFuture;
- @Override
- public boolean flush() throws IOException {
- try {
- flush(false);
- } catch (OverflowProcessorException e) {
- throw new IOException(e);
- }
- return false;
}
@Override
@@ -541,17 +529,27 @@ public class OverflowProcessor extends Processor {
LOGGER.info("The overflow processor {} starts close operation.", getProcessorName());
long closeStartTime = System.currentTimeMillis();
// flush data
- flush(true);
- LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
- // log close time
- long closeEndTime = System.currentTimeMillis();
- LOGGER.info(
- "The close operation of overflow processor {} starts at {} and ends at {}."
- + " It comsumes {}ms.",
- getProcessorName(), ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID()),
- ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
- IoTDBDescriptor.getInstance().getConfig().getZoneID()), closeEndTime - closeStartTime);
+ try {
+ flush().get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Encounter an interrupt error when waitting for the flushing, "
+ + "the bufferwrite processor is {}.",
+ getProcessorName(), e);
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ throw new OverflowProcessorException(e);
+ }
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
+ // log close time
+ long closeEndTime = System.currentTimeMillis();
+ LOGGER.info(
+ "The close operation of overflow processor {} starts at {} and ends at {}."
+ + " It comsumes {}ms.",
+ getProcessorName(), DatetimeUtils.convertMillsecondToZonedDateTime(closeStartTime),
+ DatetimeUtils.convertMillsecondToZonedDateTime(closeEndTime),
+ closeEndTime - closeStartTime);
+ }
}
public void clear() throws IOException {
@@ -641,29 +639,46 @@ public class OverflowProcessor extends Processor {
}
OverflowProcessor that = (OverflowProcessor) o;
return isMerge == that.isMerge &&
- valueCount == that.valueCount &&
- lastFlushTime == that.lastFlushTime &&
- memThreshold == that.memThreshold &&
- Objects.equals(workResource, that.workResource) &&
- Objects.equals(mergeResource, that.mergeResource) &&
- Objects.equals(workSupport, that.workSupport) &&
- Objects.equals(flushSupport, that.flushSupport) &&
- Objects.equals(flushStatus, that.flushStatus) &&
- Objects.equals(parentPath, that.parentPath) &&
- Objects.equals(dataPathCount, that.dataPathCount) &&
- Objects.equals(queryFlushLock, that.queryFlushLock) &&
- Objects.equals(overflowFlushAction, that.overflowFlushAction) &&
- Objects.equals(filenodeFlushAction, that.filenodeFlushAction) &&
- Objects.equals(fileSchema, that.fileSchema) &&
- Objects.equals(memSize, that.memSize) &&
- Objects.equals(logNode, that.logNode);
+ valueCount == that.valueCount &&
+ lastFlushTime == that.lastFlushTime &&
+ memThreshold == that.memThreshold &&
+ Objects.equals(workResource, that.workResource) &&
+ Objects.equals(mergeResource, that.mergeResource) &&
+ Objects.equals(workSupport, that.workSupport) &&
+ Objects.equals(flushSupport, that.flushSupport) &&
+ Objects.equals(flushFuture, that.flushFuture) &&
+ Objects.equals(parentPath, that.parentPath) &&
+ Objects.equals(dataPathCount, that.dataPathCount) &&
+ Objects.equals(queryFlushLock, that.queryFlushLock) &&
+ Objects.equals(overflowFlushAction, that.overflowFlushAction) &&
+ Objects.equals(filenodeFlushAction, that.filenodeFlushAction) &&
+ Objects.equals(fileSchema, that.fileSchema) &&
+ Objects.equals(memSize, that.memSize) &&
+ Objects.equals(logNode, that.logNode) &&
+ Objects.equals(flushFuture, that.flushFuture);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), workResource, mergeResource, workSupport,
- flushSupport, flushStatus, isMerge, valueCount, parentPath, lastFlushTime,
- dataPathCount, queryFlushLock, overflowFlushAction, filenodeFlushAction, fileSchema,
- memThreshold, memSize, logNode);
+ flushSupport, flushFuture, isMerge, valueCount, parentPath, lastFlushTime,
+ dataPathCount, queryFlushLock, overflowFlushAction, filenodeFlushAction, fileSchema,
+ memThreshold, memSize, logNode, flushFuture);
+ }
+
+ /**
+ * used for test. We can block to wait for finishing flushing.
+ * @return the future of the flush() task.
+ */
+ public Future<Boolean> getFlushFuture() {
+ return flushFuture;
+ }
+
+ /**
+ * used for test. We can know when the flush() is called.
+ * @return the last flush() time.
+ */
+ public long getLastFlushTime() {
+ return lastFlushTime;
}
}
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
index 0e91b61..9a1e323 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/pool/FlushManager.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.db.engine.pool;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
@@ -111,8 +113,12 @@ public class FlushManager {
}
}
- public synchronized void submit(Runnable task) {
- pool.execute(task);
+ public synchronized Future<?> submit(Runnable task) {
+ return pool.submit(task);
+ }
+
+ public synchronized <T>Future<T> submit(Callable<T> task){
+ return pool.submit(task);
}
public int getActiveCnt() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
index 5011071..266c439 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/constant/DatetimeUtils.java
@@ -27,6 +27,7 @@ import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.SignStyle;
import java.time.temporal.ChronoField;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
public class DatetimeUtils {
@@ -206,4 +207,9 @@ public class DatetimeUtils {
public static ZoneOffset toZoneOffset(ZoneId zoneId) {
return zoneId.getRules().getOffset(Instant.now());
}
+
+ public static ZonedDateTime convertMillsecondToZonedDateTime(long millisecond) {
+ return ZonedDateTime.ofInstant(Instant.ofEpochMilli(millisecond),
+ IoTDBDescriptor.getInstance().getConfig().getZoneID());
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/ImmediateFuture.java
similarity index 51%
rename from iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
rename to iotdb/src/main/java/org/apache/iotdb/db/utils/ImmediateFuture.java
index f437c96..340a427 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/utils/FlushStatus.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/ImmediateFuture.java
@@ -16,30 +16,43 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.engine.utils;
-/**
- * This class is used to represent the state of flush. It's can be used in the bufferwrite
- * flush{@code SequenceFileManager} and overflow flush{@code OverFlowProcessor}.
- */
-public class FlushStatus {
+package org.apache.iotdb.db.utils;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
- private boolean isFlushing;
+public class ImmediateFuture<T> implements Future<T> {
- public FlushStatus() {
- this.isFlushing = false;
+ T result;
+ public ImmediateFuture(T result){
+ this.result = result;
+ }
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return true;
}
- public boolean isFlushing() {
- return isFlushing;
+ @Override
+ public boolean isCancelled() {
+ return true;
}
- public void setFlushing() {
- this.isFlushing = true;
+ @Override
+ public boolean isDone() {
+ return true;
}
- public void setUnFlushing() {
- this.isFlushing = false;
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return result;
}
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return result;
+ }
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
index 22cc44a..75fa7d1 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/ProcessorTest.java
@@ -22,8 +22,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
+import java.util.concurrent.Future;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.ImmediateFuture;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -106,9 +108,8 @@ public class ProcessorTest {
}
@Override
- public boolean flush() throws IOException {
- // TODO Auto-generated method stub
- return false;
+ public Future<Boolean> flush() throws IOException {
+ return new ImmediateFuture<>(true);
}
@Override
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index cdd9ec3..3b218f9 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.engine.bufferwrite;
+import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import java.io.File;
import java.io.IOException;
@@ -27,7 +29,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -41,11 +45,14 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BufferWriteProcessorNewTest {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessorNewTest.class);
Action bfflushaction = new Action() {
@Override
@@ -99,15 +106,15 @@ public class BufferWriteProcessorNewTest {
FileSchemaUtils.constructFileSchema(processorName));
assertEquals(filename, bufferwrite.getFileName());
assertEquals(processorName + File.separator + filename, bufferwrite.getFileRelativePath());
- assertEquals(true, bufferwrite.isNewProcessor());
+ assertTrue(bufferwrite.isNewProcessor());
bufferwrite.setNewProcessor(false);
- assertEquals(false, bufferwrite.isNewProcessor());
+ assertFalse(bufferwrite.isNewProcessor());
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferwrite
.queryBufferWriteData(processorName,
measurementId, dataType);
ReadOnlyMemChunk left = pair.left;
List<ChunkMetaData> right = pair.right;
- assertEquals(true, left.isEmpty());
+ assertTrue(left.isEmpty());
assertEquals(0, right.size());
for (int i = 1; i <= 100; i++) {
bufferwrite.write(processorName, measurementId, i, dataType, String.valueOf(i));
@@ -116,7 +123,7 @@ public class BufferWriteProcessorNewTest {
pair = bufferwrite.queryBufferWriteData(processorName, measurementId, dataType);
left = pair.left;
right = pair.right;
- assertEquals(false, left.isEmpty());
+ assertFalse(left.isEmpty());
int num = 1;
Iterator<TimeValuePair> iterator = left.getIterator();
for (; num <= 100; num++) {
@@ -125,19 +132,25 @@ public class BufferWriteProcessorNewTest {
assertEquals(num, timeValuePair.getTimestamp());
assertEquals(num, timeValuePair.getValue().getInt());
}
- assertEquals(false, bufferwrite.isFlush());
+ assertFalse(bufferwrite.isFlush());
+ long lastFlushTime = bufferwrite.getLastFlushTime();
// flush asynchronously
bufferwrite.flush();
- assertEquals(true, bufferwrite.isFlush());
- assertEquals(true, bufferwrite.canBeClosed());
+ assertTrue(bufferwrite.getLastFlushTime() != lastFlushTime);
+ assertTrue(bufferwrite.canBeClosed());
// waiting for the end of flush.
- while (bufferwrite.isFlush()) {
- TimeUnit.SECONDS.sleep(1);
+ try {
+ bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ //because UT uses a mock flush operation, 10 seconds should be enough.
+ LOGGER.error(e.getMessage(),e);
+ Assert.fail("mock flush spends more than 10 seconds... "
+ + "Please modify the value or change a better test environment");
}
pair = bufferwrite.queryBufferWriteData(processorName, measurementId, dataType);
left = pair.left;
right = pair.right;
- assertEquals(true, left.isEmpty());
+ assertTrue(left.isEmpty());
assertEquals(1, right.size());
assertEquals(measurementId, right.get(0).getMeasurementUid());
assertEquals(dataType, right.get(0).getTsDataType());
@@ -150,7 +163,7 @@ public class BufferWriteProcessorNewTest {
pair = bufferWriteProcessor.queryBufferWriteData(processorName, measurementId, dataType);
left = pair.left;
right = pair.right;
- assertEquals(true, left.isEmpty());
+ assertTrue(left.isEmpty());
assertEquals(1, right.size());
assertEquals(measurementId, right.get(0).getMeasurementUid());
assertEquals(dataType, right.get(0).getTsDataType());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 06d47f1..ea2d743 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -20,7 +20,11 @@
package org.apache.iotdb.db.engine.bufferwrite;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import ch.qos.logback.core.util.TimeUtil;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -28,7 +32,9 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.iotdb.db.conf.directories.Directories;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.PathUtils;
@@ -49,9 +55,13 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class BufferWriteProcessorTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BufferWriteProcessorTest.class);
+
Action bfflushaction = new Action() {
@Override
@@ -124,7 +134,7 @@ public class BufferWriteProcessorTest {
// check file
String restoreFilePath = insertPath + ".restore";
File restoreFile = new File(dataFile, restoreFilePath);
- assertEquals(true, restoreFile.exists());
+ assertTrue(restoreFile.exists());
File insertFile = new File(dataFile, insertPath);
long insertFileLength = insertFile.length();
FileOutputStream fileOutputStream = new FileOutputStream(insertFile.getPath(), true);
@@ -140,18 +150,18 @@ public class BufferWriteProcessorTest {
directories.getFolderForTest(), deviceId,
insertPath, parameters, SysTimeVersionController.INSTANCE,
FileSchemaUtils.constructFileSchema(deviceId));
- assertEquals(true, insertFile.exists());
+ assertTrue(insertFile.exists());
assertEquals(insertFileLength, insertFile.length());
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
.queryBufferWriteData(deviceId,
measurementId, dataType);
- assertEquals(true, pair.left.isEmpty());
+ assertTrue(pair.left.isEmpty());
assertEquals(1, pair.right.size());
ChunkMetaData chunkMetaData = pair.right.get(0);
assertEquals(measurementId, chunkMetaData.getMeasurementUid());
assertEquals(dataType, chunkMetaData.getTsDataType());
bufferWriteProcessor.close();
- assertEquals(false, restoreFile.exists());
+ assertFalse(restoreFile.exists());
}
@Test
@@ -169,7 +179,7 @@ public class BufferWriteProcessorTest {
// check file
String restoreFilePath = insertPath + ".restore";
File restoreFile = new File(dataFile, restoreFilePath);
- assertEquals(true, restoreFile.exists());
+ assertTrue(restoreFile.exists());
BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
directories.getFolderForTest(), deviceId,
insertPath, parameters, SysTimeVersionController.INSTANCE,
@@ -177,14 +187,14 @@ public class BufferWriteProcessorTest {
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
.queryBufferWriteData(deviceId,
measurementId, dataType);
- assertEquals(true, pair.left.isEmpty());
+ assertTrue(pair.left.isEmpty());
assertEquals(1, pair.right.size());
ChunkMetaData chunkMetaData = pair.right.get(0);
assertEquals(measurementId, chunkMetaData.getMeasurementUid());
assertEquals(dataType, chunkMetaData.getTsDataType());
bufferWriteProcessor.close();
bufferwrite.close();
- assertEquals(false, restoreFile.exists());
+ assertFalse(restoreFile.exists());
}
@Test
@@ -193,26 +203,36 @@ public class BufferWriteProcessorTest {
bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
parameters, SysTimeVersionController.INSTANCE,
FileSchemaUtils.constructFileSchema(deviceId));
- assertEquals(false, bufferwrite.isFlush());
- assertEquals(true, bufferwrite.canBeClosed());
+ assertFalse(bufferwrite.isFlush());
+ assertTrue(bufferwrite.canBeClosed());
assertEquals(0, bufferwrite.memoryUsage());
assertEquals(TsFileIOWriter.magicStringBytes.length, bufferwrite.getFileSize());
assertEquals(0, bufferwrite.getMetaSize());
+ long lastFlushTime = bufferwrite.getLastFlushTime();
for (int i = 1; i <= 85; i++) {
bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
assertEquals(i * 12, bufferwrite.memoryUsage());
}
+ assertEquals(lastFlushTime, bufferwrite.getLastFlushTime());
bufferwrite.write(deviceId, measurementId, 86, dataType, String.valueOf(86));
- assertEquals(true, bufferwrite.isFlush());
+ //assert a flush() is called.
+ assertNotEquals(bufferwrite.getLastFlushTime(), lastFlushTime);
// sleep to the end of flush
- TimeUnit.SECONDS.sleep(2);
- assertEquals(false, bufferwrite.isFlush());
+ try {
+ bufferwrite.getFlushFuture().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ //because UT uses a mock flush operation, 10 seconds should be enough.
+ LOGGER.error(e.getMessage(), e);
+ Assert.fail("mock flush spends more than 10 seconds... "
+ + "Please modify the value or change a better test environment");
+ }
+ assertFalse(bufferwrite.isFlush());
assertEquals(0, bufferwrite.memoryUsage());
// query result
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferwrite
.queryBufferWriteData(deviceId, measurementId,
dataType);
- assertEquals(true, pair.left.isEmpty());
+ assertTrue(pair.left.isEmpty());
assertEquals(1, pair.right.size());
ChunkMetaData chunkMetaData = pair.right.get(0);
assertEquals(measurementId, chunkMetaData.getMeasurementUid());
@@ -223,7 +243,7 @@ public class BufferWriteProcessorTest {
}
pair = bufferwrite.queryBufferWriteData(deviceId, measurementId, dataType);
ReadOnlyMemChunk rawSeriesChunk = (ReadOnlyMemChunk) pair.left;
- assertEquals(false, rawSeriesChunk.isEmpty());
+ assertFalse(rawSeriesChunk.isEmpty());
assertEquals(87, rawSeriesChunk.getMinTimestamp());
Assert.assertEquals(87, rawSeriesChunk.getValueAtMinTime().getInt());
assertEquals(100, rawSeriesChunk.getMaxTimestamp());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index 4c4ee05..867e276 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -95,7 +95,6 @@ public class OverflowProcessorTest {
assertEquals(0,
overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
processor.flush();
- assertEquals(false, processor.isFlush());
assertEquals(false, processor.isMerge());
// write insert data
OverflowTestUtils.produceInsertData(processor);