You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/11/21 12:26:50 UTC
[incubator-iotdb] branch dev_new_merge_strategy updated: fix
ChunkProviderExecutor
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_new_merge_strategy by this push:
new f83a05b fix ChunkProviderExecutor
f83a05b is described below
commit f83a05b726001503839f824a5a331aedca1c5a84
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Nov 21 20:26:32 2019 +0800
fix ChunkProviderExecutor
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
.../apache/iotdb/db/engine/merge/MergeTest.java | 4 +++
.../storagegroup/StorageGroupProcessorTest.java | 6 +++++
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +++
.../iotdb/tsfile/read/TsFileSequenceReader.java | 2 +-
.../read/common/util/ChunkProviderExecutor.java | 29 ++++++----------------
7 files changed, 25 insertions(+), 24 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0b0fe68..e12d281 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -375,7 +375,7 @@ public class IoTDBConfig {
*/
private int chunkMergePointThreshold = 20480;
- private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.INPLACE_MAX_SERIES_NUM;
+ private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.SQUEEZE_MAX_FILE_NUM;
/**
* Default system file storage is in local file system (unsupported)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index d99bc69..815436e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -105,7 +105,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(MergeManager.getINSTANCE());
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(MetricsService.getInstance());
- registerManager.register(ChunkProviderExecutor.getINSTANCE());
JMXService.registerMBean(getInstance(), mbeanName);
logger.info("IoTDB is set up.");
@@ -115,6 +114,7 @@ public class IoTDB implements IoTDBMBean {
logger.info("Deactivating IoTDB...");
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
+ ChunkProviderExecutor.getINSTANCE().close();
logger.info("IoTDB is deactivated.");
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index e3dad01..8222cbc 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -42,6 +42,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProvider;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -78,6 +80,7 @@ public abstract class MergeTest {
prepareSeries();
prepareFiles(seqFileNum, unseqFileNum);
MergeManager.getINSTANCE().start();
+ ChunkProviderExecutor.getINSTANCE().start();
}
@After
@@ -91,6 +94,7 @@ public abstract class MergeTest {
MManager.getInstance().clear();
EnvironmentUtils.cleanAllDir();
MergeManager.getINSTANCE().stop();
+ ChunkProviderExecutor.getINSTANCE().close();
}
private void prepareSeries() throws MetadataErrorException, PathErrorException {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 4bae164..1d9502b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -25,7 +25,9 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.merge.MergeFileStrategy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -36,6 +38,7 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.JobFileManager;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -180,6 +183,8 @@ public class StorageGroupProcessorTest {
@Test
public void testMerge() throws QueryProcessorException {
+ MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
+ IoTDBDescriptor.getInstance().getConfig().setMergeFileStrategy(MergeFileStrategy.INPLACE_MAX_SERIES_NUM);
mergeLock = new AtomicLong(0);
for (int j = 21; j <= 30; j++) {
@@ -213,6 +218,7 @@ public class StorageGroupProcessorTest {
for (TsFileResource resource : queryDataSource.getUnseqResources()) {
Assert.assertTrue(resource.isClosed());
}
+ IoTDBDescriptor.getInstance().getConfig().setMergeFileStrategy(strategy);
}
class DummySGP extends StorageGroupProcessor {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 44a4256..4e01348 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +98,8 @@ public class EnvironmentUtils {
config.setTsFileSizeThreshold(oldTsFileThreshold);
config.setMemtableSizeThreshold(oldGroupSizeInByte);
IoTDBConfigDynamicAdapter.getInstance().reset();
+
+ ChunkProviderExecutor.getINSTANCE().close();
}
public static void cleanAllDir() throws IOException {
@@ -161,6 +164,7 @@ public class EnvironmentUtils {
FlushManager.getInstance().start();
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+ ChunkProviderExecutor.getINSTANCE().start();
}
private static void createAllDir() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index f1a9cc3..a23829e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -272,7 +272,7 @@ public class TsFileSequenceReader implements AutoCloseable {
if (cacheDeviceMetadata) {
deviceMetadata = deviceMetadataMap.get(index);
}
- if (deviceMetadata == null) {
+ if (deviceMetadata == null && index != null) {
deviceMetadata = TsDeviceMetadata.deserializeFrom(readData(index.getOffset()
, index.getLen()));
if (cacheDeviceMetadata) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
index 295e479..04a1687 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
@@ -22,18 +22,15 @@ package org.apache.iotdb.tsfile.read.common.util;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.ServiceType;
-public class ChunkProviderExecutor implements IService {
+public class ChunkProviderExecutor {
private static ChunkProviderExecutor INSTANCE = new ChunkProviderExecutor();
- private ExecutorService providerThreadPool =
- Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ private ExecutorService providerThreadPool;
private ChunkProviderExecutor() {
+ start();
}
public static ChunkProviderExecutor getINSTANCE() {
@@ -44,25 +41,15 @@ public class ChunkProviderExecutor implements IService {
providerThreadPool.submit(providerTask);
}
+ public void start() {
+ providerThreadPool =
+ Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ }
+
public void close() {
providerThreadPool.shutdownNow();
while (!providerThreadPool.isTerminated()) {
// wait
}
}
-
- @Override
- public void start() throws StartupException {
-
- }
-
- @Override
- public void stop() {
- close();
- }
-
- @Override
- public ServiceType getID() {
- return ServiceType.CHUNK_PROVIDER_SERVICE;
- }
}
\ No newline at end of file