You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ka...@apache.org on 2019/06/12 01:58:52 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated (63f4311 -> c3bbccf)

This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a change to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 63f4311  add log
     new d4ee647  add overflow IO
     new c9b8dad  redo change
     new c3bbccf  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/engine/overflow/io/OverflowIO.java    |   1 +
 .../db/engine/overflow/io/OverflowMemtable.java    |  26 ++--
 .../db/engine/overflow/io/OverflowProcessor.java   | 167 +++++++++++++++------
 .../db/engine/overflow/io/OverflowResource.java    |  16 +-
 .../engine/overflow/io/OverflowResourceTest.java   |   3 +-
 5 files changed, 145 insertions(+), 68 deletions(-)


[incubator-iotdb] 02/03: redo change

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c9b8dad1a552070674d97fd27d61b74089e9ad55
Author: kr11 <30...@qq.com>
AuthorDate: Wed Jun 12 09:52:58 2019 +0800

    redo change
---
 .../db/engine/overflow/io/OverflowMemtable.java    | 80 +++++++++++-----------
 1 file changed, 40 insertions(+), 40 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
index 46231fc..91ef235 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
@@ -38,44 +38,44 @@ import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
  */
 public class OverflowMemtable extends PrimitiveMemTable {
 
-//  /**
-//   * store update and delete data
-//   */
-//  private Map<String, Map<String, LongStatistics>> indexTrees;
-//
-//  /**
-//   * store insert data
-//   */
-//  private IMemTable memTable;
-//
-//  public OverflowMemtable() {
-//    indexTrees = new HashMap<>();
-//    memTable = new PrimitiveMemTable();
-//  }
-//
-//  public void insert(TSRecord tsRecord) {
-//    for (DataPoint dataPoint : tsRecord.dataPointList) {
-//      memTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
-//              tsRecord.time,
-//              dataPoint.getValue().toString());
-//    }
-//  }
-//
-//  /**
-//   * @deprecated update time series data
-//   */
-//  @Deprecated
-//  public void update(String deviceId, String measurementId, long startTime, long endTime,
-//                     TSDataType dataType,
-//                     byte[] value) {
-//    if (!indexTrees.containsKey(deviceId)) {
-//      indexTrees.put(deviceId, new HashMap<>());
-//    }
-//    if (!indexTrees.get(deviceId).containsKey(measurementId)) {
-//      indexTrees.get(deviceId).put(measurementId, new LongStatistics());
-//    }
-//    indexTrees.get(deviceId).get(measurementId).updateStats(startTime, endTime);
-//  }
+  /**
+   * store update and delete data
+   */
+  private Map<String, Map<String, LongStatistics>> indexTrees;
+
+  /**
+   * store insert data
+   */
+  private IMemTable memTable;
+
+  public OverflowMemtable() {
+    indexTrees = new HashMap<>();
+    memTable = new PrimitiveMemTable();
+  }
+
+  public void insert(TSRecord tsRecord) {
+    for (DataPoint dataPoint : tsRecord.dataPointList) {
+      memTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
+              tsRecord.time,
+              dataPoint.getValue().toString());
+    }
+  }
+
+  /**
+   * @deprecated update time series data
+   */
+  @Deprecated
+  public void update(String deviceId, String measurementId, long startTime, long endTime,
+                     TSDataType dataType,
+                     byte[] value) {
+    if (!indexTrees.containsKey(deviceId)) {
+      indexTrees.put(deviceId, new HashMap<>());
+    }
+    if (!indexTrees.get(deviceId).containsKey(measurementId)) {
+      indexTrees.get(deviceId).put(measurementId, new LongStatistics());
+    }
+    indexTrees.get(deviceId).get(measurementId).updateStats(startTime, endTime);
+  }
 
   public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) {
     super.delete(deviceId, measurementId, timestamp);
@@ -95,7 +95,7 @@ public class OverflowMemtable extends PrimitiveMemTable {
 //  }
 
   public boolean isEmptyOfMemTable() {
-    return this.isEmpty();
+    return super.isEmpty();
   }
 
   public IMemTable getMemTabale() {
@@ -109,6 +109,6 @@ public class OverflowMemtable extends PrimitiveMemTable {
 
   public void clear() {
 //    indexTrees.clear();
-    this.clear();
+    super.clear();
   }
 }


[incubator-iotdb] 01/03: add overflow IO

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit d4ee6470bd52d9a8393d45d7e174b38c403e8130
Author: kr11 <30...@qq.com>
AuthorDate: Tue Jun 11 19:45:18 2019 +0800

    add overflow IO
---
 .../iotdb/db/engine/overflow/io/OverflowIO.java    |   3 +-
 .../db/engine/overflow/io/OverflowMemtable.java    | 104 ++++++-------
 .../db/engine/overflow/io/OverflowProcessor.java   | 167 +++++++++++++++------
 .../db/engine/overflow/io/OverflowResource.java    |  18 ++-
 .../engine/overflow/io/OverflowResourceTest.java   |   3 +-
 5 files changed, 187 insertions(+), 108 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
index 90f0423..84dfe2c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
@@ -43,7 +43,8 @@ public class OverflowIO extends TsFileIOWriter {
   }
 
   public void clearRowGroupMetadatas() {
-    super.chunkGroupMetaDataList.clear();
+    super.flushingChunkGroupMetaDataList.clear();
+    super.flushedChunkGroupMetaDataList.clear();
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
index 5c3d3f5..46231fc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowMemtable.java
@@ -36,74 +36,70 @@ import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 /**
  * This class is used to store and query all overflow data in memory.<br>
  */
-public class OverflowMemtable {
-
-  /**
-   * store update and delete data
-   */
-  private Map<String, Map<String, LongStatistics>> indexTrees;
-
-  /**
-   * store insert data
-   */
-  private IMemTable memTable;
-
-  public OverflowMemtable() {
-    indexTrees = new HashMap<>();
-    memTable = new PrimitiveMemTable();
-  }
-
-  public void insert(TSRecord tsRecord) {
-    for (DataPoint dataPoint : tsRecord.dataPointList) {
-      memTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
-              tsRecord.time,
-              dataPoint.getValue().toString());
-    }
-  }
-
-  /**
-   * @deprecated update time series data
-   */
-  @Deprecated
-  public void update(String deviceId, String measurementId, long startTime, long endTime,
-                     TSDataType dataType,
-                     byte[] value) {
-    if (!indexTrees.containsKey(deviceId)) {
-      indexTrees.put(deviceId, new HashMap<>());
-    }
-    if (!indexTrees.get(deviceId).containsKey(measurementId)) {
-      indexTrees.get(deviceId).put(measurementId, new LongStatistics());
-    }
-    indexTrees.get(deviceId).get(measurementId).updateStats(startTime, endTime);
-  }
+public class OverflowMemtable extends PrimitiveMemTable {
+
+//  /**
+//   * store update and delete data
+//   */
+//  private Map<String, Map<String, LongStatistics>> indexTrees;
+//
+//  /**
+//   * store insert data
+//   */
+//  private IMemTable memTable;
+//
+//  public OverflowMemtable() {
+//    indexTrees = new HashMap<>();
+//    memTable = new PrimitiveMemTable();
+//  }
+//
+//  public void insert(TSRecord tsRecord) {
+//    for (DataPoint dataPoint : tsRecord.dataPointList) {
+//      memTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(),
+//              tsRecord.time,
+//              dataPoint.getValue().toString());
+//    }
+//  }
+//
+//  /**
+//   * @deprecated update time series data
+//   */
+//  @Deprecated
+//  public void update(String deviceId, String measurementId, long startTime, long endTime,
+//                     TSDataType dataType,
+//                     byte[] value) {
+//    if (!indexTrees.containsKey(deviceId)) {
+//      indexTrees.put(deviceId, new HashMap<>());
+//    }
+//    if (!indexTrees.get(deviceId).containsKey(measurementId)) {
+//      indexTrees.get(deviceId).put(measurementId, new LongStatistics());
+//    }
+//    indexTrees.get(deviceId).get(measurementId).updateStats(startTime, endTime);
+//  }
 
   public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) {
-    if (isFlushing) {
-      memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
-    } else {
-      memTable.delete(deviceId, measurementId, timestamp);
-    }
+    super.delete(deviceId, measurementId, timestamp);
   }
 
   public ReadOnlyMemChunk queryOverflowInsertInMemory(String deviceId, String measurementId,
       TSDataType dataType, Map<String, String> props) {
-    return memTable.query(deviceId, measurementId, dataType, props);
+    return super.query(deviceId, measurementId, dataType, props);
   }
 
   public boolean isEmptyOfOverflowSeriesMap() {
-    return indexTrees.isEmpty();
+    return super.isEmpty();
   }
 
-  public Map<String, Map<String, LongStatistics>> getOverflowSeriesMap() {
-    return indexTrees;
-  }
+//  public Map<String, Map<String, LongStatistics>> getOverflowSeriesMap() {
+//    return super;
+//  }
 
   public boolean isEmptyOfMemTable() {
-    return memTable.isEmpty();
+    return this.isEmpty();
   }
 
   public IMemTable getMemTabale() {
-    return memTable;
+    return this;
   }
 
   public long getSize() {
@@ -112,7 +108,7 @@ public class OverflowMemtable {
   }
 
   public void clear() {
-    indexTrees.clear();
-    memTable.clear();
+//    indexTrees.clear();
+    this.clear();
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index b6bab39..4f77c06 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.engine.overflow.io;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+
 import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
@@ -41,7 +43,12 @@ import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushCallBack;
+import org.apache.iotdb.db.engine.memtable.MemTablePool;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
@@ -49,6 +56,7 @@ import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
 import org.apache.iotdb.db.qp.constant.DatetimeUtils;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -65,6 +73,7 @@ import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,9 +84,10 @@ public class OverflowProcessor extends Processor {
   private OverflowResource workResource;
   private OverflowResource mergeResource;
 
-  private OverflowMemtable workSupport;
-  private OverflowMemtable flushSupport;
-
+  private List<IMemTable> overflowFlushMemTables = new ArrayList<>();
+  private IMemTable workSupport;
+//  private OverflowMemtable flushSupport;
+  private long flushId = -1;
   private volatile Future<Boolean> flushFuture = new ImmediateFuture<>(true);
   private volatile boolean isMerge;
   private int valueCount;
@@ -115,7 +125,6 @@ public class OverflowProcessor extends Processor {
     overflowFlushAction = parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
     filenodeFlushAction = parameters
         .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
-
     reopen();
     getLogNode();
   }
@@ -133,13 +142,14 @@ public class OverflowProcessor extends Processor {
 
     // memory
     if (workSupport == null) {
-      workSupport = new OverflowMemtable();
+      workSupport = new PrimitiveMemTable();
     } else {
       workSupport.clear();
     }
     isClosed = false;
     isFlush = false;
   }
+
   public void checkOpen() throws OverflowProcessorException {
     if (isClosed) {
       throw new OverflowProcessorException("OverflowProcessor already closed");
@@ -229,7 +239,6 @@ public class OverflowProcessor extends Processor {
     }
 
 
-
   }
 
   /**
@@ -238,8 +247,9 @@ public class OverflowProcessor extends Processor {
   @Deprecated
   public void update(String deviceId, String measurementId, long startTime, long endTime,
       TSDataType type, byte[] value) {
-    workSupport.update(deviceId, measurementId, startTime, endTime, type, value);
-    valueCount++;
+//    workSupport.update(deviceId, measurementId, startTime, endTime, type, value);
+//    valueCount++;
+    throw new UnsupportedOperationException("update has been deprecated");
   }
 
   /**
@@ -248,9 +258,10 @@ public class OverflowProcessor extends Processor {
   @Deprecated
   public void update(String deviceId, String measurementId, long startTime, long endTime,
       TSDataType type, String value) {
-    workSupport.update(deviceId, measurementId, startTime, endTime, type,
-        convertStringToBytes(type, value));
-    valueCount++;
+//    workSupport.update(deviceId, measurementId, startTime, endTime, type,
+//        convertStringToBytes(type, value));
+//    valueCount++;
+    throw new UnsupportedOperationException("update has been deprecated");
   }
 
   private byte[] convertStringToBytes(TSDataType type, String o) {
@@ -291,10 +302,14 @@ public class OverflowProcessor extends Processor {
       throw new IOException(e);
     }
     workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
-    workSupport.delete(deviceId, measurementId, timestamp, false);
+    workSupport.delete(deviceId, measurementId, timestamp);
     if (isFlush()) {
       mergeResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
-      flushSupport.delete(deviceId, measurementId, timestamp, true);
+      for (IMemTable memTable : overflowFlushMemTables) {
+        if (memTable.containSeries(deviceId, measurementId)) {
+          memTable.delete(new Deletion(deviceId + PATH_SEPARATOR + measurementId, 0, timestamp));
+        }
+      }
     }
   }
 
@@ -356,14 +371,11 @@ public class OverflowProcessor extends Processor {
     MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
     queryFlushLock.lock();
     try {
-      if (flushSupport != null && isFlush()) {
-        memSeriesLazyMerger
-            .addMemSeries(
-                flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType, props));
+      if (!overflowFlushMemTables.isEmpty() && isFlush()) {
+        for (IMemTable memTable : overflowFlushMemTables) {
+          memSeriesLazyMerger.addMemSeries(memTable.query(deviceId, measurementId, dataType, props));
+        }
       }
-      memSeriesLazyMerger
-          .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
-              dataType, props));
       // memSeriesLazyMerger has handled the props,
       // so we do not need to handle it again in the following readOnlyMemChunk
       return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger, Collections.emptyMap());
@@ -431,22 +443,13 @@ public class OverflowProcessor extends Processor {
         mergeResource.getInsertMetadatas(deviceId, measurementId, dataType, context));
   }
 
-  private void switchWorkToFlush() {
-    queryFlushLock.lock();
-    try {
-      OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() : flushSupport;
-      flushSupport = workSupport;
-      workSupport = temp;
-      isFlush = true;
-    } finally {
-      queryFlushLock.unlock();
-    }
-  }
 
   private void switchFlushToWork() {
+    LOGGER.info("Overflow Processor {} try to get flushQueryLock for switchFlushToWork", getProcessorName());
     queryFlushLock.lock();
+    LOGGER.info("Overflow Processor {} get flushQueryLock for switchFlushToWork", getProcessorName());
     try {
-      flushSupport.clear();
+//      flushSupport.clear();
       workResource.appendMetadatas();
       isFlush = false;
     } finally {
@@ -483,16 +486,26 @@ public class OverflowProcessor extends Processor {
     return isFlush;
   }
 
-  private boolean flushTask(String displayMessage, long walTaskId) {
+  private void removeFlushedMemTable(IMemTable memTable, TsFileIOWriter overflowIOWriter) {
+    this.writeLock();
+    overflowIOWriter.mergeChunkGroupMetaData();
+    try {
+      overflowFlushMemTables.remove(memTable);
+    } finally {
+      this.writeUnlock();
+    }
+  }
+
+  private boolean flushTask(String displayMessage, IMemTable currentMemTableToFlush, long walTaskId,
+      long flushId, MemTableFlushCallBack removeFlushedMemTable) {
     boolean result;
     long flushStartTime = System.currentTimeMillis();
     try {
       LOGGER.info("The overflow processor {} starts flushing {}.", getProcessorName(),
-                  displayMessage);
+          displayMessage);
       // flush data
       workResource
-          .flush(fileSchema, flushSupport.getMemTabale(),
-              getProcessorName());
+          .flush(fileSchema, currentMemTableToFlush, getProcessorName(), flushId, removeFlushedMemTable);
       filenodeFlushAction.act();
       // write-ahead log
       if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
@@ -508,8 +521,8 @@ public class OverflowProcessor extends Processor {
           Thread.currentThread().getName(), e);
       result = false;
     } finally {
-        // switch from flush to work.
-        switchFlushToWork();
+      // switch from flush to work.
+      switchFlushToWork();
     }
     // log flush time
     if (LOGGER.isInfoEnabled()) {
@@ -543,11 +556,11 @@ public class OverflowProcessor extends Processor {
           (thisFLushTime - lastFlushTime) / 1000);
     }
     lastFlushTime = System.currentTimeMillis();
-    try {
-      flushFuture.get();
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IOException(e);
-    }
+//    try {
+//      flushFuture.get();
+//    } catch (InterruptedException | ExecutionException e) {
+//      throw new IOException(e);
+//    }
     if (valueCount > 0) {
       try {
         // backup newIntervalFile list and emptyIntervalFileNode
@@ -569,12 +582,22 @@ public class OverflowProcessor extends Processor {
       BasicMemController.getInstance().releaseUsage(this, memSize.get());
       memSize.set(0);
       valueCount = 0;
+
+//      long version = versionController.nextVersion();
+      //add mmd
+      overflowFlushMemTables.add(workSupport);
+      IMemTable tmpMemTableToFlush = workSupport;
+      workSupport = MemTablePool.getInstance().getEmptyMemTable();
+      flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously",
+          tmpMemTableToFlush, walTaskId, flushId, this::removeFlushedMemTable));
+
       // switch from work to flush
-      switchWorkToFlush();
-      flushFuture = FlushManager.getInstance().submit( () ->
-          flushTask("asynchronously", walTaskId));
+//      switchWorkToFlush();
+//      flushFuture = FlushManager.getInstance().submit(() ->
+//          flushTask("asynchronously", walTaskId));
     } else {
-      flushFuture = new ImmediateFuture(true);
+//      flushFuture = new ImmediateFuture(true);
+      LOGGER.info("Nothing data points to be flushed");
     }
     return flushFuture;
 
@@ -713,6 +736,7 @@ public class OverflowProcessor extends Processor {
 
   /**
    * used for test. We can block to wait for finishing flushing.
+   *
    * @return the future of the flush() task.
    */
   public Future<Boolean> getFlushFuture() {
@@ -721,6 +745,7 @@ public class OverflowProcessor extends Processor {
 
   /**
    * used for test. We can know when the flush() is called.
+   *
    * @return the last flush() time.
    */
   public long getLastFlushTime() {
@@ -735,4 +760,52 @@ public class OverflowProcessor extends Processor {
   public boolean isClosed() {
     return isClosed;
   }
+
+
+//  private void switchWorkToFlush() {
+//    queryFlushLock.lock();
+//    try {
+//      Pair<> workSupport;
+//      workSupport = new OverflowMemtable();
+//      if(isFlush){
+//        // isFlush = true, indicating an AsyncFlushThread has been running, only add Current overflowInfo
+//        // into List.
+//
+//
+//      }else {
+//        isFlush = true;
+////        flushFuture = FlushManager.getInstance().submit(() ->
+//            flushTask("asynchronously", walTaskId));
+//      }
+//    } finally {
+//      queryFlushLock.unlock();
+//    }
+//  }
+
+//  private List<Pair<OverflowMemtable, OverflowResource>> flushTaskList;
+//
+//  private class AsyncFlushThread implements Runnable {
+//
+//    @Override
+//    public void run() {
+//      Pair<OverflowMemtable, OverflowResource> flushInfo;
+//      while (true) {
+//        queryFlushLock.lock();
+//        try {
+//          if (flushTaskList.isEmpty()) {
+//            // flushTaskList is empty, thus all flush tasks have done and switch
+//            OverflowMemtable temp = flushSupport == null ? new OverflowMemtable() : flushSupport;
+//            flushSupport = workSupport;
+//            workSupport = temp;
+//            isFlush = true;
+//            break;
+//          }
+//          flushInfo = flushTaskList.remove(0);
+//        } finally {
+//          queryFlushLock.unlock();
+//        }
+//        flush(flushInfo);
+//      }
+//    }
+//  }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index e7167fb..a5484fb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.MemTableFlushCallBack;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
 import org.apache.iotdb.db.engine.modification.Deletion;
@@ -206,12 +207,13 @@ public class OverflowResource {
     return chunkMetaDatas;
   }
 
-  public void flush(FileSchema fileSchema, IMemTable memTable, String processorName)
+  public void flush(FileSchema fileSchema, IMemTable memTable, String processorName,
+      long flushId, MemTableFlushCallBack removeFlushedMemTable)
       throws IOException {
     // insert data
     long startPos = insertIO.getPos();
     long startTime = System.currentTimeMillis();
-    flush2(fileSchema, memTable, processorName);
+    flush2(fileSchema, memTable, processorName, flushId, removeFlushedMemTable);
     long timeInterval = System.currentTimeMillis() - startTime;
     timeInterval = timeInterval == 0 ? 1 : timeInterval;
     long insertSize = insertIO.getPos() - startPos;
@@ -226,14 +228,20 @@ public class OverflowResource {
     writePositionInfo(insertIO.getPos(), 0);
   }
 
-  public void flush2(FileSchema fileSchema, IMemTable memTable, String processorName) throws IOException {
+  public void flush2(FileSchema fileSchema, IMemTable memTable, String processorName,
+      long flushId, MemTableFlushCallBack removeFlushedMemTable) throws IOException {
     if (memTable != null && !memTable.isEmpty()) {
       insertIO.toTail();
       long lastPosition = insertIO.getPos();
 //      MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable,
 //          versionController.nextVersion());
-      MemTableFlushTask task = new MemTableFlushTask(insertIO, processorName);
-      task.flushMemTable(fileSchema, memTable, versionController.nextVersion());
+
+//      MemTableFlushTask task = new MemTableFlushTask(insertIO, processorName);
+      MemTableFlushTask tableFlushTask = new MemTableFlushTask(insertIO, processorName, flushId,
+          removeFlushedMemTable);
+      tableFlushTask.flushMemTable(fileSchema, memTable, versionController.nextVersion());
+
+
       List<ChunkGroupMetaData> rowGroupMetaDatas = insertIO.getChunkGroupMetaDatas();
       appendInsertMetadatas.addAll(rowGroupMetaDatas);
       if (!rowGroupMetaDatas.isEmpty()) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
index c7662bf..24509f1 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
@@ -59,7 +59,8 @@ public class OverflowResourceTest {
   public void testOverflowInsert() throws IOException {
     OverflowTestUtils.produceInsertData(memtable);
     QueryContext context = new QueryContext();
-    work.flush(OverflowTestUtils.getFileSchema(), memtable.getMemTabale(), "processorName");
+    work.flush(OverflowTestUtils.getFileSchema(), memtable.getMemTabale(),
+        "processorName", 0, (k,v)->{});
     List<ChunkMetaData> chunkMetaDatas = work.getInsertMetadatas(OverflowTestUtils.deviceId1,
         OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, context);
     assertEquals(0, chunkMetaDatas.size());


[incubator-iotdb] 03/03: Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c3bbccf7e8ade25f2e487689266dbd6cbfb5a960
Merge: c9b8dad 63f4311
Author: kr11 <30...@qq.com>
AuthorDate: Wed Jun 12 09:58:06 2019 +0800

    Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
    
    # Conflicts:
    #	iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowIO.java
    #	iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java

 .../java/org/apache/iotdb/db/engine/Processor.java | 31 +++++++-
 .../engine/bufferwrite/BufferWriteProcessor.java   | 70 +++++++++++++-----
 .../db/engine/filenode/CopyOnWriteLinkedList.java  | 12 ++--
 .../iotdb/db/engine/filenode/FileNodeManager.java  | 61 ++++++++++++++--
 .../db/engine/filenode/FileNodeProcessor.java      | 84 ++++++++++++----------
 .../iotdb/db/engine/memtable/MemTablePool.java     |  5 +-
 .../db/engine/overflow/io/OverflowResource.java    |  2 -
 .../iotdb/db/query/control/FileReaderManager.java  |  2 +-
 .../recover/StorageGroupRecoverPerformer.java      | 12 ++--
 .../engine/bufferwrite/BufferWriteBenchmark.java   | 76 ++++++++++----------
 10 files changed, 239 insertions(+), 116 deletions(-)

diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index a5484fb,d56eb3f..a37653c
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@@ -235,13 -232,8 +235,11 @@@ public class OverflowResource 
        long lastPosition = insertIO.getPos();
  //      MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable,
  //          versionController.nextVersion());
- 
 -      MemTableFlushTask task = new MemTableFlushTask(insertIO, processorName, 0L, (a, b) -> {});
 -      task.flushMemTable(fileSchema, memTable, versionController.nextVersion());
 +//      MemTableFlushTask task = new MemTableFlushTask(insertIO, processorName);
 +      MemTableFlushTask tableFlushTask = new MemTableFlushTask(insertIO, processorName, flushId,
 +          removeFlushedMemTable);
 +      tableFlushTask.flushMemTable(fileSchema, memTable, versionController.nextVersion());
 +
- 
        List<ChunkGroupMetaData> rowGroupMetaDatas = insertIO.getChunkGroupMetaDatas();
        appendInsertMetadatas.addAll(rowGroupMetaDatas);
        if (!rowGroupMetaDatas.isEmpty()) {