You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ka...@apache.org on 2019/07/04 02:25:16 UTC
[incubator-iotdb] 01/01: init,
create java class. And a constructor for PublicBAOS.
This is an automated email from the ASF dual-hosted git repository.
kangrong pushed a commit to branch feature_chunkbufferpool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit b5eb37a7cf12c854aa5f84d83124fbb4b7c04d13
Author: kr11 <30...@qq.com>
AuthorDate: Thu Jul 4 10:24:54 2019 +0800
init, create java class. And a constructor for PublicBAOS.
---
.../db/utils/datastructure/ListPublicBAOSPool.java | 166 +++++++++++++++++++++
.../db/utils/datastructure/ListPublicBOAS.java | 91 +++++++++++
.../iotdb/tsfile/write/chunk/ChunkBuffer.java | 11 ++
3 files changed, 268 insertions(+)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBAOSPool.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBAOSPool.java
new file mode 100644
index 0000000..6e6743c
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBAOSPool.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.util.ArrayDeque;
+import java.util.EnumMap;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+/**
+ * Just copy from {@linkplain org.apache.iotdb.db.rescon.PrimitiveArrayPool PrimitiveArrayPool}.
+ *
+ * Provide and recycle {@code byte[]} in {@linkplain ListPublicBOAS}.
+ *
+ * @author Pengze Lv, kangrong
+ */
+public class ListPublicBAOSPool {
+
+ /**
+ * data type -> Array<PrimitiveArray>
+ */
+ private static final EnumMap<TSDataType, ArrayDeque> primitiveArraysMap = new EnumMap<>(TSDataType.class);
+
+ public static final int ARRAY_SIZE = 128;
+
+ static {
+ primitiveArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque());
+ primitiveArraysMap.put(TSDataType.INT32, new ArrayDeque());
+ primitiveArraysMap.put(TSDataType.INT64, new ArrayDeque());
+ primitiveArraysMap.put(TSDataType.FLOAT, new ArrayDeque());
+ primitiveArraysMap.put(TSDataType.DOUBLE, new ArrayDeque());
+ primitiveArraysMap.put(TSDataType.TEXT, new ArrayDeque());
+ }
+
+ public static ListPublicBAOSPool getInstance() {
+ return INSTANCE;
+ }
+
+ private static final ListPublicBAOSPool INSTANCE = new ListPublicBAOSPool();
+
+
+ private ListPublicBAOSPool() {}
+
+ public synchronized Object getPrimitiveDataListByType(TSDataType dataType) {
+ ArrayDeque dataListQueue = primitiveArraysMap.computeIfAbsent(dataType, k ->new ArrayDeque<>());
+ Object dataArray = dataListQueue.poll();
+ switch (dataType) {
+ case BOOLEAN:
+ if (dataArray == null) {
+ dataArray = new boolean[ARRAY_SIZE];
+ }
+ break;
+ case INT32:
+ if (dataArray == null) {
+ dataArray = new int[ARRAY_SIZE];
+ }
+ break;
+ case INT64:
+ if (dataArray == null) {
+ dataArray = new long[ARRAY_SIZE];
+ }
+ break;
+ case FLOAT:
+ if (dataArray == null) {
+ dataArray = new float[ARRAY_SIZE];
+ }
+ break;
+ case DOUBLE:
+ if (dataArray == null) {
+ dataArray = new double[ARRAY_SIZE];
+ }
+ break;
+ case TEXT:
+ if (dataArray == null) {
+ dataArray = new Binary[ARRAY_SIZE];
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException("DataType: " + dataType);
+ }
+ return dataArray;
+ }
+
+
+ public synchronized void release(Object dataArray) {
+ if (dataArray instanceof boolean[]) {
+ primitiveArraysMap.get(TSDataType.BOOLEAN).add(dataArray);
+ } else if (dataArray instanceof int[]) {
+ primitiveArraysMap.get(TSDataType.INT32).add(dataArray);
+ } else if (dataArray instanceof long[]){
+ primitiveArraysMap.get(TSDataType.INT64).add(dataArray);
+ } else if (dataArray instanceof float[]) {
+ primitiveArraysMap.get(TSDataType.FLOAT).add(dataArray);
+ } else if (dataArray instanceof double[]) {
+ primitiveArraysMap.get(TSDataType.DOUBLE).add(dataArray);
+ } else if (dataArray instanceof Binary[]) {
+ primitiveArraysMap.get(TSDataType.TEXT).add(dataArray);
+ }
+ }
+
+ /**
+ * @param size needed capacity
+ * @return an array of primitive data arrays
+ */
+ public synchronized Object getDataListsByType(TSDataType dataType, int size) {
+ int arrayNumber = (int) Math.ceil((float) size / (float)ARRAY_SIZE);
+ switch (dataType) {
+ case BOOLEAN:
+ boolean[][] booleans = new boolean[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ booleans[i] = (boolean[]) getPrimitiveDataListByType(dataType);
+ }
+ return booleans;
+ case INT32:
+ int[][] ints = new int[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ ints[i] = (int[]) getPrimitiveDataListByType(dataType);
+ }
+ return ints;
+ case INT64:
+ long[][] longs = new long[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ longs[i] = (long[]) getPrimitiveDataListByType(dataType);
+ }
+ return longs;
+ case FLOAT:
+ float[][] floats = new float[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ floats[i] = (float[]) getPrimitiveDataListByType(dataType);
+ }
+ return floats;
+ case DOUBLE:
+ double[][] doubles = new double[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ doubles[i] = (double[]) getPrimitiveDataListByType(dataType);
+ }
+ return doubles;
+ case TEXT:
+ Binary[][] binaries = new Binary[arrayNumber][];
+ for (int i = 0; i < arrayNumber; i++) {
+ binaries[i] = (Binary[]) getPrimitiveDataListByType(dataType);
+ }
+ return binaries;
+ default:
+ return null;
+ }
+ }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBOAS.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBOAS.java
new file mode 100644
index 0000000..c6b7f96
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBOAS.java
@@ -0,0 +1,91 @@
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.io.OutputStream;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+/**
+ * We reimplement {@linkplain PublicBAOS}, replacing the underlying {@linkplain
+ * java.io.ByteArrayOutputStream} with a {@code List } of {@code byte[]}.
+ *
+ * For efficient and controllable GC, all {@code byte[]} are allocated from {@linkplain
+ * ListPublicBAOSPool} and should be put back after {@code close}.
+ *
+ * Referring to {@linkplain TVList} and {@linkplain org.apache.iotdb.db.rescon.PrimitiveArrayPool PrimitiveArrayPool}.
+ *
+ * So far, this class is only used in {@linkplain org.apache.iotdb.db.engine.memtable.MemTableFlushTask
+ * MemTableFlushTask}.
+ *
+ * @author Pengze Lv, kangrong
+ */
+public class ListPublicBOAS extends PublicBAOS {
+
+ public ListPublicBOAS() {
+ super();
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+ public ListPublicBOAS(int size) {
+ super(size);
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void write(byte b[], int off, int len) {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public synchronized void write(int b) {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+
+ public byte[] getBuf() {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+ public synchronized void reset() {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+ public synchronized int size() {
+ // TODO
+ throw new UnsupportedOperationException();
+
+ }
+
+ public void close() {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * We are not sure whether following functions will be invoked in PublicBOAS. We'd defer to
+ * implement them, but throw exception to avoid unexpected calling.
+ */
+ public synchronized void writeTo(OutputStream out) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public synchronized byte toByteArray()[] {
+ throw new UnsupportedOperationException();
+ }
+
+ public synchronized String toString() {
+ throw new UnsupportedOperationException();
+ }
+
+ public synchronized String toString(String charsetName) {
+ throw new UnsupportedOperationException();
+ }
+
+
+}
+
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 290c516..28caa1c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -67,6 +67,17 @@ public class ChunkBuffer {
this.pageBuffer = new PublicBAOS();
}
+ /**
+ * constructor of ChunkBuffer.
+ *
+ * @param schema measurement schema
+ */
+ public ChunkBuffer(MeasurementSchema schema, PublicBAOS pageBuffer) {
+ this.schema = schema;
+ this.compressor = ICompressor.getCompressor(schema.getCompressor());
+ this.pageBuffer = pageBuffer;
+ }
+
public int getNumOfPages() {
return numOfPages;
}