You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/27 14:10:18 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add more javadocs and Deprecated annotation

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new b1a531d  add more javadocs and Deprecated annotation
     new 25f7abe  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
b1a531d is described below

commit b1a531dddc428a53fad32683f9aea88e5c98b9a6
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Jun 27 22:09:52 2019 +0800

    add more javadocs and Deprecated annotation
---
 .../java/org/apache/iotdb/db/engine/Processor.java |   1 +
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   4 -
 .../db/engine/memtable/IWritableMemChunk.java      |  14 ++
 .../db/engine/memtable/MemTableFlushTaskV3.java    | 262 ---------------------
 .../iotdb/db/engine/memtable/MemTablePool.java     |  38 +--
 .../org/apache/iotdb/db/monitor/StatMonitor.java   |   1 -
 6 files changed, 34 insertions(+), 286 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 7bc9693..eb0028a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
  *
  * @see BufferWriteProcessor
  */
+@Deprecated
 public abstract class Processor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 594dbd2..f6395f8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,11 +22,7 @@ import java.util.Map;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 
 /**
  * IMemTable is designed to store data points which are not flushed into TsFile yet. An instance of
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 4888b6b..cf8ccbb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -44,11 +44,25 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
 
   TSDataType getType();
 
+  /**
+   * using offset to mark which data is deleted:
+   * the data whose timestamp is less than offset are deleted.
+   * @param offset
+   */
   void setTimeOffset(long offset);
 
   void releasePrimitiveArrayList();
 
+  /**
+   * be used when flushing data on disk.
+   * this method will remove duplicated data and sort them.
+   * @return
+   */
   default DeduplicatedSortedData getDeduplicatedSortedData(){return null;}
 
+  /**
+   * served for query requests.
+   * @return
+   */
   default TVList getSortedTVList(){return null;}
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
deleted file mode 100644
index 12e00d2..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with the License.  You may obtain
- * a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied.  See the License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Future;
-import java.util.function.Consumer;
-import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
-import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemTableFlushTaskV3 {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
-  private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
-  private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
-      .getInstance();
-  private Future memoryFlushTask;
-  private Future ioFlushTask;
-  private NativeRestorableIOWriter tsFileIoWriter;
-
-  private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
-  private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
-  private volatile boolean stop = false;
-  private String storageGroup;
-
-  private Consumer<IMemTable> flushCallBack;
-  private IMemTable memTable;
-
-  public MemTableFlushTaskV3(NativeRestorableIOWriter writer, String storageGroup,
-      Consumer<IMemTable> callBack) {
-    this.tsFileIoWriter = writer;
-    this.storageGroup = storageGroup;
-    this.flushCallBack = callBack;
-//    this.memoryFlushTask = subTaskPoolManager.submit(memoryFlushThread);
-
-    memoryFlushThread.start();
-    ioFlushThread.start();
-    LOGGER.info("flush task created in Storage group {} ", storageGroup);
-  }
-
-
-  private Thread memoryFlushThread = new Thread(() -> {
-      long memSerializeTime = 0;
-      LOGGER.info("Storage group {},start serialize data into mem.", storageGroup);
-      while (!stop) {
-        if (!memoryTaskQueue.isEmpty()) {
-          LOGGER.info("memory task queue is {}", memoryTaskQueue);
-        }
-        Object task = memoryTaskQueue.poll();
-        if (task == null) {
-          try {
-            Thread.sleep(10);
-          } catch (InterruptedException e) {
-            LOGGER.error("Storage group {}, io flush task is interrupted.", storageGroup, e);
-          }
-        } else {
-          if (task instanceof String) {
-            LOGGER.info("add String {} to io queue", task);
-            ioTaskQueue.add(task);
-          } else if (task instanceof Pair) {
-            LOGGER.info("add chunk writer {}", task);
-            long starTime = System.currentTimeMillis();
-            Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
-            ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
-            IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
-                PAGE_SIZE_THRESHOLD);
-            try {
-              writeOneSeries(memorySerializeTask.left, seriesWriter,
-                  memorySerializeTask.right.getType());
-              ioTaskQueue.add(seriesWriter);
-            } catch (IOException e) {
-              LOGGER.error("Storage group {}, io error.", storageGroup, e);
-              throw new RuntimeException(e);
-            }
-            memSerializeTime += System.currentTimeMillis() - starTime;
-          } else {
-            LOGGER.info("end chunk group {} io task to io task queue", task.toString());
-            ioTaskQueue.add(task);
-          }
-        }
-      }
-      LOGGER.info("Storage group {}, flushing a memtable into disk: serialize data into mem cost {} ms.",
-          storageGroup, memSerializeTime);
-  }, Thread.currentThread().getId() + "-1");
-
-
-  //TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
-  // rather than per each memtable.
-  private Thread ioFlushThread = new Thread(() -> {
-      long ioTime = 0;
-      LOGGER.info("Storage group {}, start io cost.", storageGroup);
-      while (!stop) {
-        Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
-        if (seriesWriterOrEndChunkGroupTask == null) {
-          try {
-            Thread.sleep(10);
-          } catch (InterruptedException e) {
-            LOGGER.error("Storage group {}, io flush task is interrupted.", storageGroup, e);
-          }
-        } else {
-          long starTime = System.currentTimeMillis();
-          try {
-            if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
-              LOGGER.info("write series to disk");
-              ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
-            } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
-              LOGGER.info("start chunk group");
-              tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
-            } else {
-              LOGGER.info("end chunk group {} io task from task queue", seriesWriterOrEndChunkGroupTask.toString());
-              LOGGER.info("end chunk group");
-              ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
-              tsFileIoWriter.endChunkGroup(task.version);
-              task.finished = true;
-            }
-          } catch (IOException e) {
-            LOGGER.error("Storage group {}, io error.", storageGroup, e);
-            throw new RuntimeException(e);
-          }
-          ioTime += System.currentTimeMillis() - starTime;
-        }
-      }
-      LOGGER.info("flushing a memtable in storage group {}, cost {}ms", storageGroup, ioTime);
-  }, Thread.currentThread().getId() + "-2");
-
-
-  private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
-      TSDataType dataType)
-      throws IOException {
-    for (TimeValuePair timeValuePair : tvPairs) {
-      switch (dataType) {
-        case BOOLEAN:
-          seriesWriterImpl
-              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
-          break;
-        case INT32:
-          seriesWriterImpl.write(timeValuePair.getTimestamp(),
-              timeValuePair.getValue().getInt());
-          break;
-        case INT64:
-          seriesWriterImpl.write(timeValuePair.getTimestamp(),
-              timeValuePair.getValue().getLong());
-          break;
-        case FLOAT:
-          seriesWriterImpl.write(timeValuePair.getTimestamp(),
-              timeValuePair.getValue().getFloat());
-          break;
-        case DOUBLE:
-          seriesWriterImpl
-              .write(timeValuePair.getTimestamp(),
-                  timeValuePair.getValue().getDouble());
-          break;
-        case TEXT:
-          seriesWriterImpl
-              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
-          break;
-        default:
-          LOGGER.error("Storage group {}, don't support data type: {}", storageGroup,
-              dataType);
-          break;
-      }
-    }
-  }
-
-  /**
-   * the function for flushing memtable.
-   */
-  public void flushMemTable(FileSchema fileSchema, IMemTable imemTable) {
-    long sortTime = 0;
-    ChunkGroupIoTask theLastTask = EMPTY_TASK;
-    this.memTable = imemTable;
-    LOGGER.info("Current thread id is {}" , Thread.currentThread().getId());
-    for (String deviceId : imemTable.getMemTableMap().keySet()) {
-      memoryTaskQueue.add(deviceId);
-      int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
-      LOGGER.info("series number: {}", seriesNumber);
-      LOGGER.info("add device, memory queue {}", memoryTaskQueue);
-      for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
-        long startTime = System.currentTimeMillis();
-        // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
-        IWritableMemChunk series = imemTable.getMemTableMap().get(deviceId).get(measurementId);
-        MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
-        List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
-        sortTime += System.currentTimeMillis() - startTime;
-        LOGGER.info("add seies writer in flush thread {}", sortedTimeValuePairs);
-        memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
-        LOGGER.info("add series writer, memory queue {}", memoryTaskQueue);
-      }
-      theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, imemTable.getVersion());
-      LOGGER.info("ChunkGroupIoTask task {}", theLastTask.toString());
-      memoryTaskQueue.add(theLastTask);
-      LOGGER.info("add chunk group to task, memory queue {}", memoryTaskQueue);
-    }
-    LOGGER.info(
-        "{}, flushing a memtable into disk: data sort time cost {} ms.",
-        storageGroup, sortTime);
-    while (!theLastTask.finished) {
-      try {
-        Thread.sleep(10);
-      } catch (InterruptedException e) {
-        LOGGER.error("Storage group {}, flush memtable table thread is interrupted.",
-            storageGroup, e);
-        throw new RuntimeException(e);
-      }
-    }
-    stop = true;
-
-    while (ioFlushThread.isAlive()) {
-    }
-
-    LOGGER.info("flushing a memtable finished!");
-    flushCallBack.accept(memTable);
-  }
-
-
-  static class ChunkGroupIoTask {
-
-    int seriesNumber;
-    String deviceId;
-    long version;
-    volatile boolean finished;
-
-    public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
-      this(seriesNumber, deviceId, version, false);
-    }
-
-    public ChunkGroupIoTask(int seriesNumber, String deviceId, long version, boolean finished) {
-      this.seriesNumber = seriesNumber;
-      this.deviceId = deviceId;
-      this.version = version;
-      this.finished = finished;
-    }
-  }
-
-  private static ChunkGroupIoTask EMPTY_TASK = new ChunkGroupIoTask(0, "", 0, true);
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 4902ca6..31ec7ac 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -28,7 +28,7 @@ public class MemTablePool {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTablePool.class);
 
-  private static final Deque<IMemTable> emptyMemTables = new ArrayDeque<>();
+  private static final Deque<IMemTable> availableMemTables = new ArrayDeque<>();
 
   /**
    * >= number of storage group * 2
@@ -45,30 +45,30 @@ public class MemTablePool {
   }
 
   public IMemTable getEmptyMemTable(Object applier) {
-    synchronized (emptyMemTables) {
-      if (emptyMemTables.isEmpty() && size < capacity) {
+    synchronized (availableMemTables) {
+      if (availableMemTables.isEmpty() && size < capacity) {
         size++;
         LOGGER.info("generated a new memtable for {}, system memtable size: {}, stack size: {}",
-            applier, size, emptyMemTables.size());
+            applier, size, availableMemTables.size());
         return new PrimitiveMemTable();
-      } else if (!emptyMemTables.isEmpty()) {
+      } else if (!availableMemTables.isEmpty()) {
         LOGGER
             .info("system memtable size: {}, stack size: {}, then get a memtable from stack for {}",
-                size, emptyMemTables.size(), applier);
-        return emptyMemTables.pop();
+                size, availableMemTables.size(), applier);
+        return availableMemTables.pop();
       }
 
       // wait until some one has released a memtable
       int waitCount = 1;
       while (true) {
-        if (!emptyMemTables.isEmpty()) {
+        if (!availableMemTables.isEmpty()) {
           LOGGER.info(
               "system memtable size: {}, stack size: {}, then get a memtable from stack for {}",
-              size, emptyMemTables.size(), applier);
-          return emptyMemTables.pop();
+              size, availableMemTables.size(), applier);
+          return availableMemTables.pop();
         }
         try {
-          emptyMemTables.wait(WAIT_TIME);
+          availableMemTables.wait(WAIT_TIME);
         } catch (InterruptedException e) {
           LOGGER.error("{} fails to wait fot memtables {}, continue to wait", applier, e);
         }
@@ -78,20 +78,20 @@ public class MemTablePool {
   }
 
   public void putBack(IMemTable memTable) {
-    synchronized (emptyMemTables) {
+    synchronized (availableMemTables) {
       memTable.clear();
-      emptyMemTables.push(memTable);
-      emptyMemTables.notify();
-      LOGGER.info("a memtable returned, stack size {}", emptyMemTables.size());
+      availableMemTables.push(memTable);
+      availableMemTables.notify();
+      LOGGER.info("a memtable returned, stack size {}", availableMemTables.size());
     }
   }
 
   public void putBack(IMemTable memTable, String storageGroup) {
-    synchronized (emptyMemTables) {
+    synchronized (availableMemTables) {
       memTable.clear();
-      emptyMemTables.push(memTable);
-      emptyMemTables.notify();
-      LOGGER.info("{} return a memtable, stack size {}", storageGroup, emptyMemTables.size());
+      availableMemTables.push(memTable);
+      availableMemTables.notify();
+      LOGGER.info("{} return a memtable, stack size {}", storageGroup, availableMemTables.size());
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 7a6b0cb..69f0a95 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;