You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/12/27 08:45:32 UTC
[incubator-iotdb] 01/01: abstract TsFileFlushPolicy and allow
specifying storage groups in flush command
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch pull_up_file_close
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 36ba3d8c6e5cb8108b146e3af3c208f99bd1ca6e
Author: jt2594838 <jt...@163.com>
AuthorDate: Fri Dec 27 16:44:43 2019 +0800
abstract TsFileFlushPolicy and allow specifying storage groups in flush command
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 54 ++++++++----------
.../iotdb/db/engine/flush/TsFileFlushPolicy.java | 54 ++++++++++++++++++
.../engine/storagegroup/StorageGroupProcessor.java | 37 ++++---------
.../db/engine/storagegroup/TsFileProcessor.java | 7 +--
.../org/apache/iotdb/db/service/TSServiceImpl.java | 31 ++++++++++-
.../db/engine/cache/DeviceMetaDataCacheTest.java | 3 +-
.../storagegroup/StorageGroupProcessorTest.java | 3 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 3 +-
.../db/integration/IoTDBFlushQueryMergeTest.java | 64 +++++++++++++++++++++-
.../iotdb/db/query/reader/ReaderTestHelper.java | 3 +-
10 files changed, 188 insertions(+), 71 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index f379f0d..7c1c87a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.engine;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
@@ -39,6 +38,8 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -48,6 +49,7 @@ import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
+import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MNode;
@@ -71,7 +73,7 @@ public class StorageEngine implements IService {
private final Logger logger;
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private static final long TTL_CHECK_INTERVAL = 60 * 1000;
+ private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
@@ -94,6 +96,7 @@ public class StorageEngine implements IService {
}
private ScheduledExecutorService ttlCheckThread;
+ private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
private StorageEngine() {
logger = LoggerFactory.getLogger(StorageEngine.class);
@@ -116,7 +119,7 @@ public class StorageEngine implements IService {
for (MNode storageGroup : sgNodes) {
futures.add(recoveryThreadPool.submit((Callable<Void>) () -> {
StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
- storageGroup.getFullPath());
+ storageGroup.getFullPath(), fileFlushPolicy);
processor.setDataTTL(storageGroup.getDataTTL());
processorMap.put(storageGroup.getFullPath(), processor);
logger.info("Storage Group Processor {} is recovered successfully",
@@ -182,7 +185,7 @@ public class StorageEngine implements IService {
if (processor == null) {
logger.info("construct a processor instance, the storage group is {}, Thread is {}",
storageGroupName, Thread.currentThread().getId());
- processor = new StorageGroupProcessor(systemDir, storageGroupName);
+ processor = new StorageGroupProcessor(systemDir, storageGroupName, fileFlushPolicy);
processor.setDataTTL(
MManager.getInstance().getNodeByPathWithCheck(storageGroupName).getDataTTL());
processorMap.put(storageGroupName, processor);
@@ -264,6 +267,21 @@ public class StorageEngine implements IService {
}
}
+ public void asyncCloseProcessor(String storageGroupName, boolean isSeq)
+ throws StorageGroupNotSetException {
+ StorageGroupProcessor processor = processorMap.get(storageGroupName);
+ if (storageGroupName != null) {
+ processor.writeLock();
+ try {
+ processor.moveOneWorkProcessorToClosingList(isSeq);
+ } finally {
+ processor.writeUnlock();
+ }
+ } else {
+ throw new StorageGroupNotSetException(storageGroupName);
+ }
+ }
+
/**
* update data.
*/
@@ -308,34 +326,6 @@ public class StorageEngine implements IService {
}
/**
- * Append one specified tsfile to the storage group. <b>This method is only provided for
- * transmission module</b>
- *
- * @param storageGroupName the seriesPath of storage group
- * @param appendFile the appended tsfile information
- */
- @SuppressWarnings("unused") // reimplement sync module
- public boolean appendFileToStorageGroupProcessor(String storageGroupName,
- TsFileResource appendFile,
- String appendFilePath) throws StorageEngineException {
- // TODO reimplement sync module
- return true;
- }
-
- /**
- * get all overlap TsFiles which are conflict with the appendFile.
- *
- * @param storageGroupName the seriesPath of storage group
- * @param appendFile the appended tsfile information
- */
- @SuppressWarnings("unused") // reimplement sync module
- public List<String> getOverlapFiles(String storageGroupName, TsFileResource appendFile,
- String uuid) throws StorageEngineException {
- // TODO reimplement sync module
- return Collections.emptyList();
- }
-
- /**
* count all Tsfiles which need to be upgraded
*
* @return total num of the tsfiles which need to be upgraded
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
new file mode 100644
index 0000000..06d8d19
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.flush;
+
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TsFileFlushPolicy is applied when a TsFileProcessor is full after insertion. For standalone
+ * IoTDB, the flush or close is executed without constraint. But in the distributed version, the
+ * close is controlled by the leader and should not be performed by the follower alone.
+ */
+public interface TsFileFlushPolicy {
+
+ void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, boolean isSeq);
+
+ class DirectFlushPolicy implements TsFileFlushPolicy{
+
+ private static final Logger logger = LoggerFactory.getLogger(DirectFlushPolicy.class);
+
+ @Override
+ public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor,
+ boolean isSeq) {
+ logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
+ tsFileProcessor.getWorkMemTableMemory(),
+ tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+
+ if (tsFileProcessor.shouldClose()) {
+ storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq);
+ } else {
+ tsFileProcessor.asyncFlush();
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 741b902..646d9cc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
@@ -184,10 +185,12 @@ public class StorageGroupProcessor {
private long dataTTL = Long.MAX_VALUE;
private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+ private TsFileFlushPolicy fileFlushPolicy;
- public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
+ public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy)
throws StorageGroupProcessorException {
this.storageGroupName = storageGroupName;
+ this.fileFlushPolicy = fileFlushPolicy;
// construct the file schema
this.schema = constructSchema(storageGroupName);
@@ -437,15 +440,7 @@ public class StorageGroupProcessor {
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
- logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
- tsFileProcessor.getWorkMemTableMemory(),
- tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-
- if (tsFileProcessor.shouldClose()) {
- moveOneWorkProcessorToClosingList(sequence);
- } else {
- tsFileProcessor.asyncFlush();
- }
+ fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
}
@@ -470,15 +465,7 @@ public class StorageGroupProcessor {
// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
- logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
- tsFileProcessor.getWorkMemTableMemory(),
- tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-
- if (tsFileProcessor.shouldClose()) {
- moveOneWorkProcessorToClosingList(sequence);
- } else {
- tsFileProcessor.asyncFlush();
- }
+ fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
}
@@ -540,18 +527,18 @@ public class StorageGroupProcessor {
/**
- * only called by insert(), thread-safety should be ensured by caller
+ * thread-safety should be ensured by caller
*/
- private void moveOneWorkProcessorToClosingList(boolean sequence) {
+ public void moveOneWorkProcessorToClosingList(boolean sequence) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
- if (sequence) {
+ if (sequence && workSequenceTsFileProcessor != null) {
closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
updateEndTimeMap(workSequenceTsFileProcessor);
workSequenceTsFileProcessor.asyncClose();
workSequenceTsFileProcessor = null;
logger.info("close a sequence tsfile processor {}", storageGroupName);
- } else {
+ } else if (workUnSequenceTsFileProcessor != null){
closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
workUnSequenceTsFileProcessor.asyncClose();
workUnSequenceTsFileProcessor = null;
@@ -765,11 +752,11 @@ public class StorageGroupProcessor {
return sensorSet;
}
- private void writeLock() {
+ public void writeLock() {
insertLock.writeLock().lock();
}
- private void writeUnlock() {
+ public void writeUnlock() {
insertLock.writeLock().unlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index eec8828..c403faf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -48,7 +48,6 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseTsFile
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -224,7 +223,7 @@ public class TsFileProcessor {
}
}
- TsFileResource getTsFileResource() {
+ public TsFileResource getTsFileResource() {
return tsFileResource;
}
@@ -250,7 +249,7 @@ public class TsFileProcessor {
}
- boolean shouldClose() {
+ public boolean shouldClose() {
long fileSize = tsFileResource.getFileSize();
long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
.getTsFileSizeThreshold();
@@ -541,7 +540,7 @@ public class TsFileProcessor {
return flushingMemTables.size();
}
- long getWorkMemTableMemory() {
+ public long getWorkMemTableMemory() {
return workMemTable.memSize();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index e1b374f..55df7c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
+import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metrics.server.SqlArgument;
import org.apache.iotdb.db.qp.QueryProcessor;
@@ -469,10 +470,15 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
return false;
}
statement = statement.toLowerCase();
+ if (statement.startsWith("flush")) {
+ try {
+ execFlush(statement);
+ } catch (StorageGroupNotSetException e) {
+ throw new StorageEngineException(e);
+ }
+ return true;
+ }
switch (statement) {
- case "flush":
- StorageEngine.getInstance().syncCloseAllProcessor();
- return true;
case "merge":
StorageEngine.getInstance()
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
@@ -485,6 +491,25 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
}
}
+ private void execFlush(String statement) throws StorageGroupNotSetException {
+ String[] args = statement.split("\\s+");
+ if (args.length == 1) {
+ StorageEngine.getInstance().syncCloseAllProcessor();
+ } else if (args.length == 2){
+ String[] storageGroups = args[1].split(",\\s*");
+ for (String storageGroup : storageGroups) {
+ StorageEngine.getInstance().asyncCloseProcessor(storageGroup, true);
+ StorageEngine.getInstance().asyncCloseProcessor(storageGroup, false);
+ }
+ } else {
+ String[] storageGroups = args[1].split(",\\s*");
+ boolean isSeq = Boolean.parseBoolean(args[2]);
+ for (String storageGroup : storageGroups) {
+ StorageEngine.getInstance().asyncCloseProcessor(storageGroup, isSeq);
+ }
+ }
+ }
+
@Override
public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index d14e21e..5cec03d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -68,7 +69,7 @@ public class DeviceMetaDataCacheTest {
EnvironmentUtils.envSetUp();
MetadataManagerHelper.initMetadata();
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
- storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
+ storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
insertData();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 410de23..aed98a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
@@ -262,7 +263,7 @@ public class StorageGroupProcessorTest {
class DummySGP extends StorageGroupProcessor {
DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
- super(systemInfoDir, storageGroupName);
+ super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
}
@Override
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 334cfe2..3187063 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.StartupException;
@@ -90,7 +91,7 @@ public class TTLTest {
MManager.getInstance().setStorageGroupToMTree(sg1);
MManager.getInstance().setStorageGroupToMTree(sg2);
storageGroupProcessor = new StorageGroupProcessor(IoTDBDescriptor.getInstance().getConfig()
- .getSystemDir(), sg1);
+ .getSystemDir(), sg1, new DirectFlushPolicy());
MManager.getInstance().addPathToMTree(g1s1, TSDataType.INT64, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED, Collections.emptyMap());
storageGroupProcessor.addMeasurement("s1", TSDataType.INT64, TSEncoding.PLAIN,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
index 286b6ce..c3c9513 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java
@@ -18,6 +18,14 @@
*/
package org.apache.iotdb.db.integration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -25,8 +33,6 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.sql.*;
-import static org.junit.Assert.fail;
public class IoTDBFlushQueryMergeTest {
@@ -81,7 +87,7 @@ public class IoTDBFlushQueryMergeTest {
}
@Test
- public void selectAllSQLTest() throws ClassNotFoundException, SQLException {
+ public void selectAllSQLTest() throws ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
try (Connection connection = DriverManager
@@ -102,4 +108,56 @@ public class IoTDBFlushQueryMergeTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testFlushGivenGroup() throws ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ String insertTemplate =
+ "INSERT INTO root.group%d(timestamp, s1, s2, s3) VALUES (%d, %d, %f, %s)";
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.group1");
+ statement.execute("SET STORAGE GROUP TO root.group2");
+ statement.execute("SET STORAGE GROUP TO root.group3");
+
+ for (int i = 1; i <= 3; i++) {
+ for (int j = 10; j < 20; j++) {
+ statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+ }
+ }
+ statement.execute("FLUSH");
+
+ for (int i = 1; i <= 3; i++) {
+ for (int j = 0; j < 10; j++) {
+ statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+ }
+ }
+ statement.execute("FLUSH root.group1");
+ statement.execute("FLUSH root.group2,root.group3");
+
+ for (int i = 1; i <= 3; i++) {
+ for (int j = 0; j < 30; j++) {
+ statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j)));
+ }
+ }
+ statement.execute("FLUSH root.group1 true");
+ statement.execute("FLUSH root.group2,root.group3 false");
+
+ ResultSet resultSet = statement.executeQuery("SELECT * from root.group1,root.group2,root"
+ + ".group3");
+ int i = 0;
+ while (resultSet.next()) {
+ i ++;
+ }
+ assertEquals(30, i);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
index 2677c9b..5173405 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
@@ -52,7 +53,7 @@ public abstract class ReaderTestHelper {
EnvironmentUtils.envSetUp();
MetadataManagerHelper.initMetadata();
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
- storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
+ storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
insertData();
}