You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/10 08:30:32 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] fix dead lock in
compaction file selection (#3666)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 4e21af5 [To rel/0.12] fix dead lock in compaction file selection (#3666)
4e21af5 is described below
commit 4e21af5549022aedc4528053e1e3ae69ed4d0032
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Tue Aug 10 16:30:08 2021 +0800
[To rel/0.12] fix dead lock in compaction file selection (#3666)
---
.../resources/conf/iotdb-engine.properties | 4 -
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 -
.../db/engine/compaction/TsFileManagement.java | 2 +-
.../merge/task/CompactionMergeRecoverTask.java | 12 +-
.../db/integration/IoTDBNewTsFileCompactionIT.java | 1113 --------------------
6 files changed, 3 insertions(+), 1143 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index dbd1578..bf467cf 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -318,10 +318,6 @@ timestamp_precision=ms
# Whether to merge unseq files into seq files or not.
# enable_unseq_compaction=true
-# Works when the compaction_strategy is LEVEL_COMPACTION.
-# Whether to start next compaction task automatically after finish one compaction task
-# enable_continuous_compaction=true
-
# Start compaction task at this delay, unit is ms
# compaction_interval=30000
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7e7be3e..f1c8e11 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -384,7 +384,7 @@ public class IoTDBConfig {
private int queryTimeoutThreshold = 60000;
/** compaction interval in ms */
- private long compactionInterval = 10000;
+ private long compactionInterval = 30000;
/** Replace implementation class of JDBC service */
private String rpcImplClassName = TSServiceImpl.class.getName();
@@ -1227,14 +1227,6 @@ public class IoTDBConfig {
this.mergeThreadNum = mergeThreadNum;
}
- public boolean isContinueMergeAfterReboot() {
- return continueMergeAfterReboot;
- }
-
- void setContinueMergeAfterReboot(boolean continueMergeAfterReboot) {
- this.continueMergeAfterReboot = continueMergeAfterReboot;
- }
-
public long getMergeIntervalSec() {
return mergeIntervalSec;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d4f59d1..2bdb253 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -473,11 +473,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"merge_chunk_subthread_num",
Integer.toString(conf.getMergeChunkSubThreadNum()))));
- conf.setContinueMergeAfterReboot(
- Boolean.parseBoolean(
- properties.getProperty(
- "continue_merge_after_reboot",
- Boolean.toString(conf.isContinueMergeAfterReboot()))));
conf.setMergeFileSelectionTimeBudget(
Long.parseLong(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 838006c..c148ca1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -200,7 +200,7 @@ public abstract class TsFileManagement {
// wait until seq merge has finished
while (isSeqMerging) {
try {
- wait(200);
+ Thread.sleep(200);
} catch (InterruptedException e) {
logger.error("{} [Compaction] shutdown", storageGroupName, e);
Thread.currentThread().interrupt();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
index 8385934..871961d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/CompactionMergeRecoverTask.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.db.engine.merge.task;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
-import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -29,7 +27,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -73,14 +70,7 @@ public class CompactionMergeRecoverTask implements Runnable {
public void run() {
tsFileManagement.recovered = false;
try {
- recoverMergeTask.recoverMerge(
- IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
- File mergingMods =
- SystemFileFactory.INSTANCE.getFile(
- storageGroupSysDir, StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME);
- if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
- mergingMods.delete();
- }
+ recoverMergeTask.recoverMerge(true);
} catch (MetadataException | IOException e) {
logger.error(e.getMessage(), e);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
deleted file mode 100644
index 7bc0c81..0000000
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBNewTsFileCompactionIT.java
+++ /dev/null
@@ -1,1113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.integration;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
-import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
-import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class IoTDBNewTsFileCompactionIT {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBNewTsFileCompactionIT.class);
-
- private int prevSeqLevelFileNum;
- private int prevSeqLevelNum;
- private int prevMergePagePointNumber;
- private int preMaxNumberOfPointsInPage;
- private CompactionStrategy preCompactionStrategy;
- private PartialPath storageGroupPath;
- // the unit is ns
- private static final long MAX_WAIT_TIME_FOR_MERGE = 1L * 60L * 1000L * 1000L * 1000L;
- private static final float FLOAT_DELTA = 0.00001f;
-
- @Before
- public void setUp() throws Exception {
- EnvironmentUtils.closeStatMonitor();
- prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel();
- prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
- prevMergePagePointNumber =
- IoTDBDescriptor.getInstance().getConfig().getMergePagePointNumberThreshold();
- preMaxNumberOfPointsInPage =
- TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
- preCompactionStrategy = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy();
- storageGroupPath = new PartialPath("root.sg1");
- IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
- IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
- IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(1);
- TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(1);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
- IoTDBDescriptor.getInstance().getConfig().setCompactionThreadNum(10);
- EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
-
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- statement.execute("SET STORAGE GROUP TO root.sg1");
- }
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
- IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum);
- IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setMergePagePointNumberThreshold(prevMergePagePointNumber);
- TSFileDescriptor.getInstance()
- .getConfig()
- .setMaxNumberOfPointsInPage(preMaxNumberOfPointsInPage);
- IoTDBDescriptor.getInstance().getConfig().setCompactionStrategy(preCompactionStrategy);
- }
-
- /**
- * first file has only one page for each chunk and only one chunk for each time series second file
- * has only one page for each chunk and only one chunk for each time series
- */
- @Test
- public void test1() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(10000);
-
- // first file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("FLUSH");
-
- // second file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has only one page for each chunk and two chunks for each time series second file has
- * only one page for each chunk and only one chunk for each time series
- */
- @Test
- public void test2() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(1);
-
- // first file
- // two chunks
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- // second file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has two pages for each chunk and only one chunk for each time series second file has
- * only one page for each chunk and only one chunk for each time series
- */
- @Test
- public void test3() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(10000);
-
- // first file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- // second file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has two pages for each chunk and two chunks for each time series second file has
- * only one page for each chunk and only one chunk for each time series
- */
- @Test
- public void test4() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"},
- {"5", "5"},
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
-
- // first file
- // one chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- // another chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- statement.execute("FLUSH");
-
- // second file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has only one page for each chunk and only one chunk for each time series second file
- * has two pages for each chunk and only one chunk for each time series
- */
- @Test
- public void test5() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(10000);
-
- // first file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("FLUSH");
-
- // second file
- // two pages for one chunk
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has only one page for each chunk and two chunks for each time series second file has
- * two pages for each chunk and only one chunk for each time series
- */
- @Test
- public void test6() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"},
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(1);
-
- // first file
- // two chunks
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
- // second file
- // two pages for one chunk
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
-
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has two pages for each chunk and only one chunk for each time series second file has
- * two pages for each chunk and only one chunk for each time series
- */
- @Test
- public void test7() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(10000);
-
- // first file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- // second file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has two pages for each chunk and two chunks for each time series second file has two
- * pages for each chunk and only one chunk for each time series
- */
- @Test
- public void test8() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"},
- {"5", "5"},
- {"6", "6"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
-
- // first file
- // one chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- // another chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- statement.execute("FLUSH");
-
- // second file
- // two pages for one chunk
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has only one page for each chunk and only one chunk for each time series second file
- * has only one page for each chunk and two chunks for each time series
- */
- @Test
- public void test9() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(10000);
-
- // first file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("FLUSH");
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(1);
- // second file
- // two pages for one chunk
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has only one page for each chunk and two chunks for each time series second file has
- * only one page for each chunk and two chunks for each time series
- */
- @Test
- public void test10() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(1);
-
- // first file
- // two chunks
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- // second file
- // two pages for one chunk
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
-
- try (ResultSet resultSet =
- statement.executeQuery("SELECT count(s1) FROM root.sg1.d1 where time < 4")) {
- assertTrue(resultSet.next());
- assertEquals(3L, resultSet.getLong("count(root.sg1.d1.s1)"));
- }
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has two pages for each chunk and only one chunk for each time series second file has
- * only one page for each chunk and two chunks for each time series
- */
- @Test
- public void test11() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(10000);
-
- // first file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(1);
- // second file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has two pages for each chunk and two chunks for each time series second file has
- * only one page for each chunk and two chunks for each time series
- */
- @Test
- public void test12() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"},
- {"5", "5"},
- {"6", "6"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
- // first file
- // one chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- // another chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- statement.execute("FLUSH");
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(1);
- // second file
- // two pages for one chunk
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has only one page for each chunk and only one chunk for each time series second file
- * has two pages for each chunk and two chunks for each time series
- */
- @Test
- public void test13() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"},
- {"5", "5"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(10000);
-
- // first file
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("FLUSH");
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
- // second file
- // one chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- // another chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has only one page for each chunk and two chunks for each time series second file has
- * two pages for each chunk and two chunks for each time series
- */
- @Test
- public void test14() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"},
- {"5", "5"},
- {"6", "6"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(1);
-
- // first file
- // two chunks
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
- // second file
- // one chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- // another chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has two pages for each chunk and only one chunk for each time series second file has
- * two pages for each chunk and two chunks for each time series
- */
- @Test
- public void test15() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"},
- {"5", "5"},
- {"6", "6"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(10000);
-
- // first file
- // two pages for one chunk
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- statement.execute("FLUSH");
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
- // second file
- // one chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- // another chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /**
- * first file has two pages for each chunk and two chunks for each time series second file has two
- * pages for each chunk and two chunks for each time series
- */
- @Test
- public void test16() throws SQLException {
- String[][] retArray = {
- {"1", "1"},
- {"2", "2"},
- {"3", "3"},
- {"4", "4"},
- {"5", "5"},
- {"6", "6"},
- {"7", "7"},
- {"8", "8"}
- };
- int preAvgSeriesPointNumberThreshold =
- IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement()) {
-
- IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
- // first file
- // one chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(1, 1)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(2, 2)");
- // another chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(3, 3)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(4, 4)");
- statement.execute("FLUSH");
-
- // second file
- // one chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(5, 5)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(6, 6)");
- // another chunk with two pages
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(7, 7)");
- statement.execute("INSERT INTO root.sg1.d1(time,s1) values(8, 8)");
- statement.execute("FLUSH");
-
- LOGGER.warn("Waiting for merge to finish");
- assertTrue(waitForMergeFinish());
- LOGGER.warn("Merge Finish");
-
- int cnt;
- try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg1.d1")) {
- cnt = 0;
- while (resultSet.next()) {
- long time = resultSet.getLong("Time");
- float s1 = resultSet.getFloat("root.sg1.d1.s1");
- assertEquals(Long.parseLong(retArray[cnt][0]), time);
- assertEquals(Float.parseFloat(retArray[cnt][1]), s1, FLOAT_DELTA);
- cnt++;
- }
- }
- assertEquals(retArray.length, cnt);
- } catch (StorageEngineException | InterruptedException e) {
- e.printStackTrace();
- fail();
- } finally {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
- }
- }
-
- /** wait until merge is finished */
- private boolean waitForMergeFinish() throws StorageEngineException, InterruptedException {
- StorageGroupProcessor storageGroupProcessor =
- StorageEngine.getInstance().getProcessor(storageGroupPath);
- LevelCompactionTsFileManagement tsFileManagement =
- (LevelCompactionTsFileManagement) storageGroupProcessor.getTsFileManagement();
-
- long startTime = System.nanoTime();
- long intervalTime = startTime;
- // get the size of level 1's tsfile list to judge whether merge is finished
- while (tsFileManagement.getSequenceTsFileResources().get(0L).size() < 2
- || tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size() != 1) {
- TimeUnit.MILLISECONDS.sleep(100);
- // wait too long, just break
- if ((System.nanoTime() - startTime) >= MAX_WAIT_TIME_FOR_MERGE) {
- LOGGER.error("Unable to wait for compaction finish");
- fail();
- break;
- }
- if ((System.nanoTime() - intervalTime) >= 20L * 1000L * 1000L * 1000L) {
- intervalTime = System.nanoTime();
- LOGGER.warn(
- "The number of tsfile level: {}",
- tsFileManagement.getSequenceTsFileResources().get(0L).size());
- LOGGER.warn(
- "The number of tsfile in level 0: {}",
- tsFileManagement.getSequenceTsFileResources().get(0L).get(0).size());
- LOGGER.warn(
- "The number of tsfile in level 1: {}",
- tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size());
- LOGGER.warn(
- "The number of current compaction task num {}",
- CompactionMergeTaskPoolManager.getInstance().getCompactionTaskNum());
- }
- }
- return tsFileManagement.getSequenceTsFileResources().get(0L).get(1).size() == 1;
- }
-}