You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/29 02:58:17 UTC

[iotdb] branch auto_compressor created (now b1ecc6ddbbf)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch auto_compressor
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at b1ecc6ddbbf add auto compressor

This branch includes the following new commits:

     new b1ecc6ddbbf add auto compressor

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add auto compressor

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch auto_compressor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b1ecc6ddbbfab00a9d7a3fb9c5f3816bbc58c9bd
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon May 29 11:01:08 2023 +0800

    add auto compressor
---
 .../apache/iotdb/tsfile/compress/ICompressor.java  |   3 +
 .../iotdb/tsfile/compress/auto/AutoCompressor.java | 116 ++++++++++
 .../tsfile/compress/auto/AutoUncompressor.java     |  74 +++++++
 .../tsfile/compress/auto/CompressionSampler.java   | 246 +++++++++++++++++++++
 .../file/metadata/enums/CompressionType.java       |   5 +-
 5 files changed, 443 insertions(+), 1 deletion(-)

diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
index ed6d9bdcc59..84e2614c67d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.tsfile.compress;
 
+import org.apache.iotdb.tsfile.compress.auto.AutoCompressor;
 import org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedException;
 import org.apache.iotdb.tsfile.exception.compress.GZIPCompressOverflowException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -75,6 +76,8 @@ public interface ICompressor extends Serializable {
         return new ZstdCompressor();
       case LZMA2:
         return new LZMA2Compressor();
+      case AUTO:
+        return new AutoCompressor();
       default:
         throw new CompressionTypeNotSupportedException(name.toString());
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java
new file mode 100644
index 00000000000..62f2877a534
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoCompressor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.compress.auto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+public class AutoCompressor implements ICompressor {
+
+  private CompressionSampler sampler;
+
+  public AutoCompressor() {
+    List<CompressionType> compressionTypes = collectCompressionTypes();
+    double alpha = 1.0;
+    long minSampleIntervalMS = 1000;
+    sampler = new CompressionSampler(compressionTypes, alpha, minSampleIntervalMS);
+  }
+
+  public AutoCompressor(double alpha, long minSampleIntervalMS) {
+    List<CompressionType> compressionTypes = collectCompressionTypes();
+    sampler = new CompressionSampler(compressionTypes, alpha, minSampleIntervalMS);
+  }
+
+  private static List<CompressionType> collectCompressionTypes() {
+    List<CompressionType> compressionTypeList = new ArrayList<>(
+        CompressionType.values().length - 1);
+    for (CompressionType type : CompressionType.values()) {
+      if (!type.equals(CompressionType.AUTO) && !type.equals(CompressionType.UNCOMPRESSED)) {
+        compressionTypeList.add(type);
+      }
+    }
+    return compressionTypeList;
+  }
+
+  @Override
+  public byte[] compress(byte[] data) throws IOException {
+    if (sampler.shouldSample()) {
+      return sampler.sample(data);
+    }
+    ICompressor preferredSampler = sampler.getPreferredSampler();
+    byte[] compress = preferredSampler.compress(data);
+    byte[] result = new byte[compress.length + 1];
+    System.arraycopy(compress, 0, result, 0, compress.length);
+    result[compress.length] = preferredSampler.getType().serialize();
+    return result;
+  }
+
+  @Override
+  public byte[] compress(byte[] data, int offset, int length) throws IOException {
+    if (sampler.shouldSample()) {
+      return sampler.sample(data, offset, length);
+    }
+    ICompressor preferredSampler = sampler.getPreferredSampler();
+    byte[] compress = preferredSampler.compress(data, offset, length);
+    byte[] result = new byte[compress.length + 1];
+    System.arraycopy(compress, 0, result, 0, compress.length);
+    result[compress.length] = preferredSampler.getType().serialize();
+    return result;
+  }
+
+  @Override
+  public int compress(byte[] data, int offset, int length, byte[] compressed) throws IOException {
+    if (sampler.shouldSample()) {
+      return sampler.sample(data, offset, length, compressed);
+    }
+    ICompressor preferredSampler = sampler.getPreferredSampler();
+    int compressedLength = preferredSampler.compress(data, offset, length, compressed);
+    compressed[compressedLength] = preferredSampler.getType().serialize();
+    return compressedLength + 1;
+  }
+
+  @Override
+  public int compress(ByteBuffer data, ByteBuffer compressed) throws IOException {
+    if (sampler.shouldSample()) {
+      return sampler.sample(data, compressed);
+    }
+    ICompressor preferredSampler = sampler.getPreferredSampler();
+    int compressedLength = preferredSampler.compress(data, compressed);
+    compressed.mark();
+    compressed.position(compressed.position() + compressedLength);
+    compressed.put(preferredSampler.getType().serialize());
+    compressed.reset();
+    return compressedLength + 1;
+  }
+
+  @Override
+  public int getMaxBytesForCompression(int uncompressedDataSize) {
+    return sampler.getMaxBytesForCompression(uncompressedDataSize);
+  }
+
+  @Override
+  public CompressionType getType() {
+    return CompressionType.AUTO;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java
new file mode 100644
index 00000000000..037ffedabd7
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/AutoUncompressor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.compress.auto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+public class AutoUncompressor implements IUnCompressor {
+
+  @Override
+  public int getUncompressedLength(byte[] array, int offset, int length) throws IOException {
+    byte realType = array[offset + length - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.getUncompressedLength(array, offset, length);
+  }
+
+  @Override
+  public int getUncompressedLength(ByteBuffer buffer) throws IOException {
+    byte realType = buffer.array()[buffer.position() + buffer.remaining() - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.getUncompressedLength(buffer);
+  }
+
+  @Override
+  public byte[] uncompress(byte[] byteArray) throws IOException {
+    byte realType = byteArray[byteArray.length - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.uncompress(byteArray);
+  }
+
+  @Override
+  public int uncompress(byte[] byteArray, int offset, int length, byte[] output, int outOffset)
+      throws IOException {
+    byte realType = byteArray[offset + length - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.uncompress(byteArray, offset, length, output, outOffset);
+  }
+
+  @Override
+  public int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
+    byte realType = compressed.array()[compressed.position() + compressed.remaining() - 1];
+    IUnCompressor unCompressor = IUnCompressor.getUnCompressor(
+        CompressionType.deserialize(realType));
+    return unCompressor.uncompress(compressed, uncompressed);
+  }
+
+  @Override
+  public CompressionType getCodecName() {
+    return null;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java
new file mode 100644
index 00000000000..8358c4896e5
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/auto/CompressionSampler.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.compress.auto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompressionSampler {
+
+  private static final Logger logger = LoggerFactory.getLogger(CompressionSampler.class);
+
+  private List<CompressionType> compressionTypes;
+  private long minSampleInterval;
+  private long lastSampleTimeMS;
+  private List<ICompressor> compressors;
+  private List<CompressionMonitor> monitors;
+  private int preferredCompressorIndex;
+
+  public CompressionSampler(List<CompressionType> compressionTypes, double alpha,
+      long minSampleInterval) {
+    this.compressionTypes = compressionTypes;
+    this.minSampleInterval = minSampleInterval;
+    this.monitors = new ArrayList<>(compressionTypes.size());
+    this.compressors = new ArrayList<>(compressionTypes.size());
+
+    int maxSampleNum = 10;
+
+    for (CompressionType compressionType : compressionTypes) {
+      monitors.add(new CompressionMonitor(maxSampleNum, alpha));
+      compressors.add(ICompressor.getCompressor(compressionType));
+    }
+  }
+
+  public boolean shouldSample() {
+    return System.currentTimeMillis() - lastSampleTimeMS >= minSampleInterval;
+  }
+
+  public ICompressor getPreferredSampler() {
+    return compressors.get(preferredCompressorIndex);
+  }
+
+  public byte[] sample(byte[] data) throws IOException {
+    return sample(data, 0, data.length);
+  }
+
+  public byte[] sample(byte[] data, int offset, int length) throws IOException {
+    CompressionType bestType = CompressionType.UNCOMPRESSED;
+    int smallestLength = length;
+    byte[] bestResult = data;
+
+    for (int i = 0; i < compressionTypes.size(); i++) {
+      ICompressor compressor = compressors.get(i);
+      CompressionMonitor monitor = monitors.get(i);
+      long startTime = System.currentTimeMillis();
+      byte[] compressed = compressor.compress(data, offset, length);
+      int bytesBeforeCompression = data.length;
+      int bytesAfterCompression = compressed.length;
+      long timeConsumption = System.currentTimeMillis() - startTime;
+      monitor.addSample(bytesBeforeCompression, bytesAfterCompression, timeConsumption);
+
+      if (bytesAfterCompression < smallestLength) {
+        smallestLength = bytesAfterCompression;
+        bestType = compressionTypes.get(i);
+        bestResult = compressed;
+      }
+    }
+
+    lastSampleTimeMS = System.currentTimeMillis();
+    updatePreferredIndex();
+
+    byte[] result = new byte[bestResult.length + 1];
+    System.arraycopy(bestResult, 0, result, 0, bestResult.length);
+    result[bestResult.length] = bestType.serialize();
+    return result;
+  }
+
+  public int sample(byte[] data, int offset, int length, byte[] compressed) throws IOException {
+    CompressionType bestType = CompressionType.UNCOMPRESSED;
+    int smallestLength = length;
+
+    for (int i = 0; i < compressionTypes.size(); i++) {
+      ICompressor compressor = compressors.get(i);
+      CompressionMonitor monitor = monitors.get(i);
+      long startTime = System.currentTimeMillis();
+      int bytesAfterCompression = compressor.compress(data, offset, length, compressed);
+      int bytesBeforeCompression = data.length;
+      long timeConsumption = System.currentTimeMillis() - startTime;
+      monitor.addSample(bytesBeforeCompression, bytesAfterCompression, timeConsumption);
+
+      if (bytesAfterCompression < smallestLength) {
+        smallestLength = bytesAfterCompression;
+        bestType = compressionTypes.get(i);
+      }
+    }
+
+    lastSampleTimeMS = System.currentTimeMillis();
+    updatePreferredIndex();
+
+    compressed[smallestLength] = bestType.serialize();
+    return smallestLength + 1;
+  }
+
+  public int sample(ByteBuffer data, ByteBuffer compressed) throws IOException {
+    CompressionType bestType = CompressionType.UNCOMPRESSED;
+    int smallestLength = data.remaining();
+
+    for (int i = 0; i < compressionTypes.size(); i++) {
+      ICompressor compressor = compressors.get(i);
+      CompressionMonitor monitor = monitors.get(i);
+      long startTime = System.currentTimeMillis();
+      int bytesAfterCompression = compressor.compress(data, compressed);
+      int bytesBeforeCompression = data.remaining();
+      long timeConsumption = System.currentTimeMillis() - startTime;
+      monitor.addSample(bytesBeforeCompression, bytesAfterCompression, timeConsumption);
+
+      if (bytesAfterCompression < smallestLength) {
+        smallestLength = bytesAfterCompression;
+        bestType = compressionTypes.get(i);
+      }
+    }
+
+    lastSampleTimeMS = System.currentTimeMillis();
+    updatePreferredIndex();
+
+    compressed.mark();
+    compressed.position(compressed.position() + smallestLength);
+    compressed.put(bestType.serialize());
+    compressed.reset();
+    return smallestLength + 1;
+  }
+
+  public int getMaxBytesForCompression(int uncompressedDataSize) {
+    int maxBytes = 0;
+    for (ICompressor compressor : compressors) {
+      maxBytes = Math.max(maxBytes, compressor.getMaxBytesForCompression(uncompressedDataSize));
+    }
+    return maxBytes;
+  }
+
+
+  private void updatePreferredIndex() {
+    double bestScore = 0;
+    int prevIndex = preferredCompressorIndex;
+    for (int i = 0; i < monitors.size(); i++) {
+      double score = monitors.get(i).score();
+      if (score > bestScore) {
+        preferredCompressorIndex = i;
+      }
+    }
+    if (prevIndex != preferredCompressorIndex) {
+      logger.info("Preferred compressor changed to {}", compressors.get(preferredCompressorIndex));
+    }
+  }
+
+  private static class CompressionSample {
+
+    private long bytesBeforeCompression;
+    private long bytesAfterCompression;
+    private long timeConsumptionNS;
+
+    public CompressionSample(long bytesBeforeCompression, long bytesAfterCompression,
+        long timeConsumptionNS) {
+      this.bytesBeforeCompression = bytesBeforeCompression;
+      this.bytesAfterCompression = bytesAfterCompression;
+      this.timeConsumptionNS = timeConsumptionNS;
+    }
+  }
+
+  private static class CompressionMonitor {
+
+    private Queue<CompressionSample> samples;
+    private int maxSampleNum;
+    private double alpha;
+    private long bytesBeforeCompressionSum;
+    private long bytesAfterCompressionSum;
+    private long timeConsumptionSumNS;
+
+    private CompressionMonitor(int maxSampleNum, double alpha) {
+      this.maxSampleNum = maxSampleNum;
+      this.samples = new ArrayDeque<>(maxSampleNum);
+      this.alpha = alpha;
+    }
+
+    private double compressionRatio() {
+      return bytesAfterCompressionSum * 1.0 / bytesBeforeCompressionSum;
+    }
+
+    private double throughput() {
+      return bytesBeforeCompressionSum * 1.0 / timeConsumptionSumNS;
+    }
+
+    private double score() {
+      return Math.pow(throughput(), alpha) / compressionRatio();
+    }
+
+    private void addSample(long bytesBeforeCompression, long bytesAfterCompression,
+        long timeConsumptionNS) {
+      CompressionSample sample = new CompressionSample(bytesBeforeCompression,
+          bytesAfterCompression, timeConsumptionNS);
+      if (samples.size() < maxSampleNum) {
+        addSample(sample);
+      } else {
+        removeSample();
+      }
+    }
+
+    private void addSample(CompressionSample sample) {
+      bytesAfterCompressionSum += sample.bytesAfterCompression;
+      bytesBeforeCompressionSum += sample.bytesBeforeCompression;
+      timeConsumptionSumNS += sample.timeConsumptionNS;
+      samples.add(sample);
+    }
+
+    private void removeSample() {
+      CompressionSample sample = samples.remove();
+      bytesBeforeCompressionSum -= sample.bytesBeforeCompression;
+      bytesAfterCompressionSum -= sample.bytesAfterCompression;
+      timeConsumptionSumNS -= sample.timeConsumptionNS;
+    }
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
index 9afa137f27c..86ea6db4dee 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
@@ -34,7 +34,8 @@ public enum CompressionType {
   /** ZSTD */
   ZSTD(".zstd", (byte) 8),
   /** LZMA2 */
-  LZMA2(".lzma2", (byte) 9);
+  LZMA2(".lzma2", (byte) 9),
+  AUTO(".auto", (byte) 10);
 
   private final String extensionName;
   private final byte index;
@@ -64,6 +65,8 @@ public enum CompressionType {
         return CompressionType.ZSTD;
       case 9:
         return CompressionType.LZMA2;
+      case 10:
+        return CompressionType.AUTO;
       default:
         throw new IllegalArgumentException("Invalid input: " + compressor);
     }