You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/06/27 14:10:18 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
more javadocs and Deprecated annotation
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new b1a531d add more javadocs and Deprecated annotation
new 25f7abe Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
b1a531d is described below
commit b1a531dddc428a53fad32683f9aea88e5c98b9a6
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Jun 27 22:09:52 2019 +0800
add more javadocs and Deprecated annotation
---
.../java/org/apache/iotdb/db/engine/Processor.java | 1 +
.../apache/iotdb/db/engine/memtable/IMemTable.java | 4 -
.../db/engine/memtable/IWritableMemChunk.java | 14 ++
.../db/engine/memtable/MemTableFlushTaskV3.java | 262 ---------------------
.../iotdb/db/engine/memtable/MemTablePool.java | 38 +--
.../org/apache/iotdb/db/monitor/StatMonitor.java | 1 -
6 files changed, 34 insertions(+), 286 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
index 7bc9693..eb0028a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/Processor.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
*
* @see BufferWriteProcessor
*/
+@Deprecated
public abstract class Processor {
private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 594dbd2..f6395f8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -22,11 +22,7 @@ import java.util.Map;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
/**
* IMemTable is designed to store data points which are not flushed into TsFile yet. An instance of
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 4888b6b..cf8ccbb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -44,11 +44,25 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
TSDataType getType();
+ /**
+ * using offset to mark which data is deleted:
+ * the data whose timestamp is less than offset are deleted.
+ * @param offset
+ */
void setTimeOffset(long offset);
void releasePrimitiveArrayList();
+ /**
+ * be used when flushing data on disk.
+ * this method will remove duplicated data and sort them.
+ * @return
+ */
default DeduplicatedSortedData getDeduplicatedSortedData(){return null;}
+ /**
+ * served for query requests.
+ * @return
+ */
default TVList getSortedTVList(){return null;}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
deleted file mode 100644
index 12e00d2..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV3.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with the License. You may obtain
- * a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.engine.memtable;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Future;
-import java.util.function.Consumer;
-import org.apache.iotdb.db.engine.pool.FlushSubTaskPoolManager;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
-import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
-import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemTableFlushTaskV3 {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushTask.class);
- private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
- private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
- .getInstance();
- private Future memoryFlushTask;
- private Future ioFlushTask;
- private NativeRestorableIOWriter tsFileIoWriter;
-
- private ConcurrentLinkedQueue ioTaskQueue = new ConcurrentLinkedQueue();
- private ConcurrentLinkedQueue memoryTaskQueue = new ConcurrentLinkedQueue();
- private volatile boolean stop = false;
- private String storageGroup;
-
- private Consumer<IMemTable> flushCallBack;
- private IMemTable memTable;
-
- public MemTableFlushTaskV3(NativeRestorableIOWriter writer, String storageGroup,
- Consumer<IMemTable> callBack) {
- this.tsFileIoWriter = writer;
- this.storageGroup = storageGroup;
- this.flushCallBack = callBack;
-// this.memoryFlushTask = subTaskPoolManager.submit(memoryFlushThread);
-
- memoryFlushThread.start();
- ioFlushThread.start();
- LOGGER.info("flush task created in Storage group {} ", storageGroup);
- }
-
-
- private Thread memoryFlushThread = new Thread(() -> {
- long memSerializeTime = 0;
- LOGGER.info("Storage group {},start serialize data into mem.", storageGroup);
- while (!stop) {
- if (!memoryTaskQueue.isEmpty()) {
- LOGGER.info("memory task queue is {}", memoryTaskQueue);
- }
- Object task = memoryTaskQueue.poll();
- if (task == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("Storage group {}, io flush task is interrupted.", storageGroup, e);
- }
- } else {
- if (task instanceof String) {
- LOGGER.info("add String {} to io queue", task);
- ioTaskQueue.add(task);
- } else if (task instanceof Pair) {
- LOGGER.info("add chunk writer {}", task);
- long starTime = System.currentTimeMillis();
- Pair<List<TimeValuePair>, MeasurementSchema> memorySerializeTask = (Pair<List<TimeValuePair>, MeasurementSchema>) task;
- ChunkBuffer chunkBuffer = new ChunkBuffer(memorySerializeTask.right);
- IChunkWriter seriesWriter = new ChunkWriterImpl(memorySerializeTask.right, chunkBuffer,
- PAGE_SIZE_THRESHOLD);
- try {
- writeOneSeries(memorySerializeTask.left, seriesWriter,
- memorySerializeTask.right.getType());
- ioTaskQueue.add(seriesWriter);
- } catch (IOException e) {
- LOGGER.error("Storage group {}, io error.", storageGroup, e);
- throw new RuntimeException(e);
- }
- memSerializeTime += System.currentTimeMillis() - starTime;
- } else {
- LOGGER.info("end chunk group {} io task to io task queue", task.toString());
- ioTaskQueue.add(task);
- }
- }
- }
- LOGGER.info("Storage group {}, flushing a memtable into disk: serialize data into mem cost {} ms.",
- storageGroup, memSerializeTime);
- }, Thread.currentThread().getId() + "-1");
-
-
- //TODO a better way is: for each TsFile, assign it a Executors.singleThreadPool,
- // rather than per each memtable.
- private Thread ioFlushThread = new Thread(() -> {
- long ioTime = 0;
- LOGGER.info("Storage group {}, start io cost.", storageGroup);
- while (!stop) {
- Object seriesWriterOrEndChunkGroupTask = ioTaskQueue.poll();
- if (seriesWriterOrEndChunkGroupTask == null) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("Storage group {}, io flush task is interrupted.", storageGroup, e);
- }
- } else {
- long starTime = System.currentTimeMillis();
- try {
- if (seriesWriterOrEndChunkGroupTask instanceof IChunkWriter) {
- LOGGER.info("write series to disk");
- ((IChunkWriter) seriesWriterOrEndChunkGroupTask).writeToFileWriter(tsFileIoWriter);
- } else if (seriesWriterOrEndChunkGroupTask instanceof String) {
- LOGGER.info("start chunk group");
- tsFileIoWriter.startChunkGroup((String) seriesWriterOrEndChunkGroupTask);
- } else {
- LOGGER.info("end chunk group {} io task from task queue", seriesWriterOrEndChunkGroupTask.toString());
- LOGGER.info("end chunk group");
- ChunkGroupIoTask task = (ChunkGroupIoTask) seriesWriterOrEndChunkGroupTask;
- tsFileIoWriter.endChunkGroup(task.version);
- task.finished = true;
- }
- } catch (IOException e) {
- LOGGER.error("Storage group {}, io error.", storageGroup, e);
- throw new RuntimeException(e);
- }
- ioTime += System.currentTimeMillis() - starTime;
- }
- }
- LOGGER.info("flushing a memtable in storage group {}, cost {}ms", storageGroup, ioTime);
- }, Thread.currentThread().getId() + "-2");
-
-
- private void writeOneSeries(List<TimeValuePair> tvPairs, IChunkWriter seriesWriterImpl,
- TSDataType dataType)
- throws IOException {
- for (TimeValuePair timeValuePair : tvPairs) {
- switch (dataType) {
- case BOOLEAN:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
- break;
- case INT32:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getInt());
- break;
- case INT64:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getLong());
- break;
- case FLOAT:
- seriesWriterImpl.write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getFloat());
- break;
- case DOUBLE:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(),
- timeValuePair.getValue().getDouble());
- break;
- case TEXT:
- seriesWriterImpl
- .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
- break;
- default:
- LOGGER.error("Storage group {}, don't support data type: {}", storageGroup,
- dataType);
- break;
- }
- }
- }
-
- /**
- * the function for flushing memtable.
- */
- public void flushMemTable(FileSchema fileSchema, IMemTable imemTable) {
- long sortTime = 0;
- ChunkGroupIoTask theLastTask = EMPTY_TASK;
- this.memTable = imemTable;
- LOGGER.info("Current thread id is {}" , Thread.currentThread().getId());
- for (String deviceId : imemTable.getMemTableMap().keySet()) {
- memoryTaskQueue.add(deviceId);
- int seriesNumber = imemTable.getMemTableMap().get(deviceId).size();
- LOGGER.info("series number: {}", seriesNumber);
- LOGGER.info("add device, memory queue {}", memoryTaskQueue);
- for (String measurementId : imemTable.getMemTableMap().get(deviceId).keySet()) {
- long startTime = System.currentTimeMillis();
- // TODO if we can not use TSFileIO writer, then we have to redesign the class of TSFileIO.
- IWritableMemChunk series = imemTable.getMemTableMap().get(deviceId).get(measurementId);
- MeasurementSchema desc = fileSchema.getMeasurementSchema(measurementId);
- List<TimeValuePair> sortedTimeValuePairs = series.getSortedTimeValuePairList();
- sortTime += System.currentTimeMillis() - startTime;
- LOGGER.info("add seies writer in flush thread {}", sortedTimeValuePairs);
- memoryTaskQueue.add(new Pair<>(sortedTimeValuePairs, desc));
- LOGGER.info("add series writer, memory queue {}", memoryTaskQueue);
- }
- theLastTask = new ChunkGroupIoTask(seriesNumber, deviceId, imemTable.getVersion());
- LOGGER.info("ChunkGroupIoTask task {}", theLastTask.toString());
- memoryTaskQueue.add(theLastTask);
- LOGGER.info("add chunk group to task, memory queue {}", memoryTaskQueue);
- }
- LOGGER.info(
- "{}, flushing a memtable into disk: data sort time cost {} ms.",
- storageGroup, sortTime);
- while (!theLastTask.finished) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- LOGGER.error("Storage group {}, flush memtable table thread is interrupted.",
- storageGroup, e);
- throw new RuntimeException(e);
- }
- }
- stop = true;
-
- while (ioFlushThread.isAlive()) {
- }
-
- LOGGER.info("flushing a memtable finished!");
- flushCallBack.accept(memTable);
- }
-
-
- static class ChunkGroupIoTask {
-
- int seriesNumber;
- String deviceId;
- long version;
- volatile boolean finished;
-
- public ChunkGroupIoTask(int seriesNumber, String deviceId, long version) {
- this(seriesNumber, deviceId, version, false);
- }
-
- public ChunkGroupIoTask(int seriesNumber, String deviceId, long version, boolean finished) {
- this.seriesNumber = seriesNumber;
- this.deviceId = deviceId;
- this.version = version;
- this.finished = finished;
- }
- }
-
- private static ChunkGroupIoTask EMPTY_TASK = new ChunkGroupIoTask(0, "", 0, true);
-
-}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 4902ca6..31ec7ac 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -28,7 +28,7 @@ public class MemTablePool {
private static final Logger LOGGER = LoggerFactory.getLogger(MemTablePool.class);
- private static final Deque<IMemTable> emptyMemTables = new ArrayDeque<>();
+ private static final Deque<IMemTable> availableMemTables = new ArrayDeque<>();
/**
* >= number of storage group * 2
@@ -45,30 +45,30 @@ public class MemTablePool {
}
public IMemTable getEmptyMemTable(Object applier) {
- synchronized (emptyMemTables) {
- if (emptyMemTables.isEmpty() && size < capacity) {
+ synchronized (availableMemTables) {
+ if (availableMemTables.isEmpty() && size < capacity) {
size++;
LOGGER.info("generated a new memtable for {}, system memtable size: {}, stack size: {}",
- applier, size, emptyMemTables.size());
+ applier, size, availableMemTables.size());
return new PrimitiveMemTable();
- } else if (!emptyMemTables.isEmpty()) {
+ } else if (!availableMemTables.isEmpty()) {
LOGGER
.info("system memtable size: {}, stack size: {}, then get a memtable from stack for {}",
- size, emptyMemTables.size(), applier);
- return emptyMemTables.pop();
+ size, availableMemTables.size(), applier);
+ return availableMemTables.pop();
}
// wait until some one has released a memtable
int waitCount = 1;
while (true) {
- if (!emptyMemTables.isEmpty()) {
+ if (!availableMemTables.isEmpty()) {
LOGGER.info(
"system memtable size: {}, stack size: {}, then get a memtable from stack for {}",
- size, emptyMemTables.size(), applier);
- return emptyMemTables.pop();
+ size, availableMemTables.size(), applier);
+ return availableMemTables.pop();
}
try {
- emptyMemTables.wait(WAIT_TIME);
+ availableMemTables.wait(WAIT_TIME);
} catch (InterruptedException e) {
LOGGER.error("{} fails to wait fot memtables {}, continue to wait", applier, e);
}
@@ -78,20 +78,20 @@ public class MemTablePool {
}
public void putBack(IMemTable memTable) {
- synchronized (emptyMemTables) {
+ synchronized (availableMemTables) {
memTable.clear();
- emptyMemTables.push(memTable);
- emptyMemTables.notify();
- LOGGER.info("a memtable returned, stack size {}", emptyMemTables.size());
+ availableMemTables.push(memTable);
+ availableMemTables.notify();
+ LOGGER.info("a memtable returned, stack size {}", availableMemTables.size());
}
}
public void putBack(IMemTable memTable, String storageGroup) {
- synchronized (emptyMemTables) {
+ synchronized (availableMemTables) {
memTable.clear();
- emptyMemTables.push(memTable);
- emptyMemTables.notify();
- LOGGER.info("{} return a memtable, stack size {}", storageGroup, emptyMemTables.size());
+ availableMemTables.push(memTable);
+ availableMemTables.notify();
+ LOGGER.info("{} return a memtable, stack size {}", storageGroup, availableMemTables.size());
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 7a6b0cb..69f0a95 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2;
import org.apache.iotdb.db.exception.FileNodeManagerException;
-import org.apache.iotdb.db.exception.MetadataArgsErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;