You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/11/21 12:26:50 UTC

[incubator-iotdb] branch dev_new_merge_strategy updated: fix ChunkProviderExecutor

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_new_merge_strategy by this push:
     new f83a05b  fix ChunkProviderExecutor
f83a05b is described below

commit f83a05b726001503839f824a5a331aedca1c5a84
Author: jt2594838 <jt...@163.com>
AuthorDate: Thu Nov 21 20:26:32 2019 +0800

    fix ChunkProviderExecutor
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  2 +-
 .../apache/iotdb/db/engine/merge/MergeTest.java    |  4 +++
 .../storagegroup/StorageGroupProcessorTest.java    |  6 +++++
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  4 +++
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  2 +-
 .../read/common/util/ChunkProviderExecutor.java    | 29 ++++++----------------
 7 files changed, 25 insertions(+), 24 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0b0fe68..e12d281 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -375,7 +375,7 @@ public class IoTDBConfig {
    */
   private int chunkMergePointThreshold = 20480;
 
-  private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.INPLACE_MAX_SERIES_NUM;
+  private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.SQUEEZE_MAX_FILE_NUM;
 
   /**
    * Default system file storage is in local file system (unsupported)
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index d99bc69..815436e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -105,7 +105,6 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(MergeManager.getINSTANCE());
     registerManager.register(CacheHitRatioMonitor.getInstance());
     registerManager.register(MetricsService.getInstance());
-    registerManager.register(ChunkProviderExecutor.getINSTANCE());
     JMXService.registerMBean(getInstance(), mbeanName);
 
     logger.info("IoTDB is set up.");
@@ -115,6 +114,7 @@ public class IoTDB implements IoTDBMBean {
     logger.info("Deactivating IoTDB...");
     registerManager.deregisterAll();
     JMXService.deregisterMBean(mbeanName);
+    ChunkProviderExecutor.getINSTANCE().close();
     logger.info("IoTDB is deactivated.");
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index e3dad01..8222cbc 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -42,6 +42,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProvider;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
@@ -78,6 +80,7 @@ public abstract class MergeTest {
     prepareSeries();
     prepareFiles(seqFileNum, unseqFileNum);
     MergeManager.getINSTANCE().start();
+    ChunkProviderExecutor.getINSTANCE().start();
   }
 
   @After
@@ -91,6 +94,7 @@ public abstract class MergeTest {
     MManager.getInstance().clear();
     EnvironmentUtils.cleanAllDir();
     MergeManager.getINSTANCE().stop();
+    ChunkProviderExecutor.getINSTANCE().close();
   }
 
   private void prepareSeries() throws MetadataErrorException, PathErrorException {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 4bae164..1d9502b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -25,7 +25,9 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.ArrayList;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.merge.MergeFileStrategy;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 
@@ -36,6 +38,7 @@ import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.JobFileManager;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -180,6 +183,8 @@ public class StorageGroupProcessorTest {
 
   @Test
   public void testMerge() throws QueryProcessorException {
+    MergeFileStrategy strategy = IoTDBDescriptor.getInstance().getConfig().getMergeFileStrategy();
+    IoTDBDescriptor.getInstance().getConfig().setMergeFileStrategy(MergeFileStrategy.INPLACE_MAX_SERIES_NUM);
 
     mergeLock = new AtomicLong(0);
     for (int j = 21; j <= 30; j++) {
@@ -213,6 +218,7 @@ public class StorageGroupProcessorTest {
     for (TsFileResource resource : queryDataSource.getUnseqResources()) {
       Assert.assertTrue(resource.isClosed());
     }
+    IoTDBDescriptor.getInstance().getConfig().setMergeFileStrategy(strategy);
   }
 
   class DummySGP extends StorageGroupProcessor {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 44a4256..4e01348 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
+import org.apache.iotdb.tsfile.read.common.util.ChunkProviderExecutor;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,6 +98,8 @@ public class EnvironmentUtils {
     config.setTsFileSizeThreshold(oldTsFileThreshold);
     config.setMemtableSizeThreshold(oldGroupSizeInByte);
     IoTDBConfigDynamicAdapter.getInstance().reset();
+
+    ChunkProviderExecutor.getINSTANCE().close();
   }
 
   public static void cleanAllDir() throws IOException {
@@ -161,6 +164,7 @@ public class EnvironmentUtils {
     FlushManager.getInstance().start();
     TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
     TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
+    ChunkProviderExecutor.getINSTANCE().start();
   }
 
   private static void createAllDir() {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index f1a9cc3..a23829e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -272,7 +272,7 @@ public class TsFileSequenceReader implements AutoCloseable {
     if (cacheDeviceMetadata) {
       deviceMetadata = deviceMetadataMap.get(index);
     }
-    if (deviceMetadata == null) {
+    if (deviceMetadata == null && index != null) {
       deviceMetadata = TsDeviceMetadata.deserializeFrom(readData(index.getOffset()
           , index.getLen()));
       if (cacheDeviceMetadata) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
index 295e479..04a1687 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/util/ChunkProviderExecutor.java
@@ -22,18 +22,15 @@ package org.apache.iotdb.tsfile.read.common.util;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import org.apache.iotdb.db.exception.StartupException;
-import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.ServiceType;
 
-public class ChunkProviderExecutor implements IService {
+public class ChunkProviderExecutor {
 
   private static ChunkProviderExecutor INSTANCE = new ChunkProviderExecutor();
 
-  private ExecutorService providerThreadPool =
-      Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+  private ExecutorService providerThreadPool;
 
   private ChunkProviderExecutor() {
+    start();
   }
 
   public static ChunkProviderExecutor getINSTANCE() {
@@ -44,25 +41,15 @@ public class ChunkProviderExecutor implements IService {
     providerThreadPool.submit(providerTask);
   }
 
+  public void start() {
+    providerThreadPool =
+        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+  }
+
   public void close() {
     providerThreadPool.shutdownNow();
     while (!providerThreadPool.isTerminated()) {
       // wait
     }
   }
-
-  @Override
-  public void start() throws StartupException {
-
-  }
-
-  @Override
-  public void stop() {
-    close();
-  }
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.CHUNK_PROVIDER_SERVICE;
-  }
 }
\ No newline at end of file