You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ka...@apache.org on 2019/07/04 02:25:16 UTC

[incubator-iotdb] 01/01: init, create java class. And a constructor for PublicBAOS.

This is an automated email from the ASF dual-hosted git repository.

kangrong pushed a commit to branch feature_chunkbufferpool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit b5eb37a7cf12c854aa5f84d83124fbb4b7c04d13
Author: kr11 <30...@qq.com>
AuthorDate: Thu Jul 4 10:24:54 2019 +0800

    init, create java class. And a constructor for PublicBAOS.
---
 .../db/utils/datastructure/ListPublicBAOSPool.java | 166 +++++++++++++++++++++
 .../db/utils/datastructure/ListPublicBOAS.java     |  91 +++++++++++
 .../iotdb/tsfile/write/chunk/ChunkBuffer.java      |  11 ++
 3 files changed, 268 insertions(+)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBAOSPool.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBAOSPool.java
new file mode 100644
index 0000000..6e6743c
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBAOSPool.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.util.ArrayDeque;
+import java.util.EnumMap;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+/**
+ * Just copy from {@linkplain org.apache.iotdb.db.rescon.PrimitiveArrayPool PrimitiveArrayPool}.
+ *
+ * Provide and recycle {@code byte[]} in {@linkplain ListPublicBOAS}.
+ *
+ * @author Pengze Lv, kangrong
+ */
+public class ListPublicBAOSPool {
+
+  /**
+   * data type -> Array<PrimitiveArray>
+   */
+  private static final EnumMap<TSDataType, ArrayDeque> primitiveArraysMap = new EnumMap<>(TSDataType.class);
+
+  public static final int ARRAY_SIZE = 128;
+
+  static {
+    primitiveArraysMap.put(TSDataType.BOOLEAN, new ArrayDeque());
+    primitiveArraysMap.put(TSDataType.INT32, new ArrayDeque());
+    primitiveArraysMap.put(TSDataType.INT64, new ArrayDeque());
+    primitiveArraysMap.put(TSDataType.FLOAT, new ArrayDeque());
+    primitiveArraysMap.put(TSDataType.DOUBLE, new ArrayDeque());
+    primitiveArraysMap.put(TSDataType.TEXT, new ArrayDeque());
+  }
+
+  public static ListPublicBAOSPool getInstance() {
+    return INSTANCE;
+  }
+
+  private static final ListPublicBAOSPool INSTANCE = new ListPublicBAOSPool();
+
+
+  private ListPublicBAOSPool() {}
+
+  public synchronized Object getPrimitiveDataListByType(TSDataType dataType) {
+    ArrayDeque dataListQueue = primitiveArraysMap.computeIfAbsent(dataType, k ->new ArrayDeque<>());
+    Object dataArray = dataListQueue.poll();
+    switch (dataType) {
+      case BOOLEAN:
+        if (dataArray == null) {
+          dataArray = new boolean[ARRAY_SIZE];
+        }
+        break;
+      case INT32:
+        if (dataArray == null) {
+          dataArray = new int[ARRAY_SIZE];
+        }
+        break;
+      case INT64:
+        if (dataArray == null) {
+          dataArray = new long[ARRAY_SIZE];
+        }
+        break;
+      case FLOAT:
+        if (dataArray == null) {
+          dataArray = new float[ARRAY_SIZE];
+        }
+        break;
+      case DOUBLE:
+        if (dataArray == null) {
+          dataArray = new double[ARRAY_SIZE];
+        }
+        break;
+      case TEXT:
+        if (dataArray == null) {
+          dataArray = new Binary[ARRAY_SIZE];
+        }
+        break;
+      default:
+        throw new UnSupportedDataTypeException("DataType: " + dataType);
+    }
+    return dataArray;
+  }
+
+
+  public synchronized void release(Object dataArray) {
+    if (dataArray instanceof boolean[]) {
+      primitiveArraysMap.get(TSDataType.BOOLEAN).add(dataArray);
+    } else if (dataArray instanceof int[]) {
+      primitiveArraysMap.get(TSDataType.INT32).add(dataArray);
+    } else if (dataArray instanceof long[]){
+      primitiveArraysMap.get(TSDataType.INT64).add(dataArray);
+    } else if (dataArray instanceof float[]) {
+      primitiveArraysMap.get(TSDataType.FLOAT).add(dataArray);
+    } else if (dataArray instanceof double[]) {
+      primitiveArraysMap.get(TSDataType.DOUBLE).add(dataArray);
+    } else if (dataArray instanceof Binary[]) {
+      primitiveArraysMap.get(TSDataType.TEXT).add(dataArray);
+    }
+  }
+
+  /**
+   * @param size needed capacity
+   * @return an array of primitive data arrays
+   */
+  public synchronized Object getDataListsByType(TSDataType dataType, int size) {
+    int arrayNumber = (int) Math.ceil((float) size / (float)ARRAY_SIZE);
+    switch (dataType) {
+      case BOOLEAN:
+        boolean[][] booleans = new boolean[arrayNumber][];
+        for (int i = 0; i < arrayNumber; i++) {
+          booleans[i] = (boolean[]) getPrimitiveDataListByType(dataType);
+        }
+        return booleans;
+      case INT32:
+        int[][] ints = new int[arrayNumber][];
+        for (int i = 0; i < arrayNumber; i++) {
+          ints[i] = (int[]) getPrimitiveDataListByType(dataType);
+        }
+        return ints;
+      case INT64:
+        long[][] longs = new long[arrayNumber][];
+        for (int i = 0; i < arrayNumber; i++) {
+          longs[i] = (long[]) getPrimitiveDataListByType(dataType);
+        }
+        return longs;
+      case FLOAT:
+        float[][] floats = new float[arrayNumber][];
+        for (int i = 0; i < arrayNumber; i++) {
+          floats[i] = (float[]) getPrimitiveDataListByType(dataType);
+        }
+        return floats;
+      case DOUBLE:
+        double[][] doubles = new double[arrayNumber][];
+        for (int i = 0; i < arrayNumber; i++) {
+          doubles[i] = (double[]) getPrimitiveDataListByType(dataType);
+        }
+        return doubles;
+      case TEXT:
+        Binary[][] binaries = new Binary[arrayNumber][];
+        for (int i = 0; i < arrayNumber; i++) {
+          binaries[i] = (Binary[]) getPrimitiveDataListByType(dataType);
+        }
+        return binaries;
+      default:
+        return null;
+    }
+  }
+
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBOAS.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBOAS.java
new file mode 100644
index 0000000..c6b7f96
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/ListPublicBOAS.java
@@ -0,0 +1,91 @@
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.io.OutputStream;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
+/**
+ * We reimplement {@linkplain PublicBAOS}, replacing the underlying {@linkplain
+ * java.io.ByteArrayOutputStream} with a {@code List } of {@code byte[]}.
+ *
+ * For efficient and controllable GC, all {@code byte[]} are allocated from {@linkplain
+ * ListPublicBAOSPool} and should be put back after {@code close}.
+ *
+ * Referring to {@linkplain TVList} and {@linkplain org.apache.iotdb.db.rescon.PrimitiveArrayPool PrimitiveArrayPool}.
+ *
+ * So far, this class is only used in {@linkplain org.apache.iotdb.db.engine.memtable.MemTableFlushTask
+ * MemTableFlushTask}.
+ *
+ * @author Pengze Lv, kangrong
+ */
+public class ListPublicBOAS extends PublicBAOS {
+
+  public ListPublicBOAS() {
+    super();
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  public ListPublicBOAS(int size) {
+    super(size);
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized void write(byte b[], int off, int len) {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized void write(int b) {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+
+  public byte[] getBuf() {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  public synchronized void reset() {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  public synchronized int size() {
+    // TODO
+    throw new UnsupportedOperationException();
+
+  }
+
+  public void close() {
+    // TODO
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * We are not sure whether following functions will be invoked in PublicBOAS. We'd defer to
+   * implement them, but throw exception to avoid unexpected calling.
+   */
+  public synchronized void writeTo(OutputStream out) {
+    throw new UnsupportedOperationException();
+  }
+
+
+  public synchronized byte toByteArray()[] {
+    throw new UnsupportedOperationException();
+  }
+
+  public synchronized String toString() {
+    throw new UnsupportedOperationException();
+  }
+
+  public synchronized String toString(String charsetName) {
+    throw new UnsupportedOperationException();
+  }
+
+
+}
+
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
index 290c516..28caa1c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkBuffer.java
@@ -67,6 +67,17 @@ public class ChunkBuffer {
     this.pageBuffer = new PublicBAOS();
   }
 
+  /**
+   * constructor of ChunkBuffer.
+   *
+   * @param schema measurement schema
+   */
+  public ChunkBuffer(MeasurementSchema schema, PublicBAOS pageBuffer) {
+    this.schema = schema;
+    this.compressor = ICompressor.getCompressor(schema.getCompressor());
+    this.pageBuffer = pageBuffer;
+  }
+
   public int getNumOfPages() {
     return numOfPages;
   }