You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/03/31 16:09:07 UTC

[iotdb] 01/01: add LZMA compression

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch add-lzma-compress
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit acd534a5400f0d199eaa0f945da81c2312d4d309
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Sat Apr 1 00:08:36 2023 +0800

    add LZMA compression
---
 .../main/java/org/apache/iotdb/SessionExample.java | 121 ++++++---------------
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 tsfile/pom.xml                                     |  10 ++
 .../apache/iotdb/tsfile/compress/ICompressor.java  |  68 ++++++++++++
 .../iotdb/tsfile/compress/IUnCompressor.java       |  62 +++++++++++
 .../compress/LZMACompressOverflowException.java    |  26 +++++
 .../file/metadata/enums/CompressionType.java       |   5 +-
 7 files changed, 203 insertions(+), 91 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 953b7fd1da..1f9b80521e 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.isession.template.Template;
 import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.template.MeasurementNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -70,51 +69,8 @@ public class SessionExample {
             .version(Version.V_1_0)
             .build();
     session.open(false);
-
-    // set session fetchSize
-    session.setFetchSize(10000);
-
-    try {
-      session.createDatabase("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
-        throw e;
-      }
-    }
-
-    //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
     insertTablet();
-    //    insertTabletWithNullValues();
-    //    insertTablets();
-    //    insertRecords();
-    //    insertText();
-    //    selectInto();
-    //    createAndDropContinuousQueries();
-    //    nonQuery();
-    query();
-    //    queryWithTimeout();
-    rawDataQuery();
-    lastDataQuery();
-    aggregationQuery();
-    groupByQuery();
-    //    queryByIterator();
-    //    deleteData();
-    //    deleteTimeseries();
-    //    setTimeout();
-
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
+
     session.close();
   }
 
@@ -386,58 +342,45 @@ public class SessionExample {
      */
     // The schema of measurements of one device
     // only measurementId and data type in MeasurementSchema take effects in Tablet
+    long startTime = System.currentTimeMillis();
     List<MeasurementSchema> schemaList = new ArrayList<>();
-    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
-    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
-    Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100);
-
-    // Method 1 to add tablet data
-    long timestamp = System.currentTimeMillis();
-
-    for (long row = 0; row < 100; row++) {
-      int rowIndex = tablet.rowSize++;
-      tablet.addTimestamp(rowIndex, timestamp);
-      for (int s = 0; s < 3; s++) {
-        long value = new Random().nextLong();
-        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
-        tablet.reset();
+    for (int i = 1; i <= 100; i++) {
+      schemaList.add(
+          new MeasurementSchema(
+              "s" + i, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY));
+    }
+    Random random = new Random();
+    for (int i = 1; i <= 1000; i++) {
+      String device = "root.sg.d" + i;
+      Tablet tablet = new Tablet(device, schemaList, 100);
+      // Method 1 to add tablet data
+      long timestamp = System.currentTimeMillis();
+
+      for (long row = 0; row < 10000; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp);
+        for (int s = 0; s < 100; s++) {
+          long value = Math.abs(random.nextLong() % 1000);
+          tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+        }
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          session.insertTablet(tablet, true);
+          tablet.reset();
+        }
+        timestamp++;
       }
-      timestamp++;
-    }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
 
-    // Method 2 to add tablet data
-    long[] timestamps = tablet.timestamps;
-    Object[] values = tablet.values;
-
-    for (long time = 0; time < 100; time++) {
-      int row = tablet.rowSize++;
-      timestamps[row] = time;
-      for (int i = 0; i < 3; i++) {
-        long[] sensor = (long[]) values[i];
-        sensor[row] = i;
-      }
-      if (tablet.rowSize == tablet.getMaxRowNumber()) {
-        session.insertTablet(tablet, true);
+      if (tablet.rowSize != 0) {
+        session.insertTablet(tablet);
         tablet.reset();
       }
+      System.out.println(i);
     }
-
-    if (tablet.rowSize != 0) {
-      session.insertTablet(tablet);
-      tablet.reset();
-    }
+    System.out.println("Time Cost: " + (System.currentTimeMillis() - startTime));
   }
 
+  public static String getRandomString() {}
+
   private static void insertTabletWithNullValues()
       throws IoTDBConnectionException, StatementExecutionException {
     /*
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b0f97c2288..cdc69fe2e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,7 +397,7 @@ public class IoTDBConfig {
   private int avgSeriesPointNumberThreshold = 100000;
 
   /** Enable inner space compaction for sequence files */
-  private boolean enableSeqSpaceCompaction = true;
+  private boolean enableSeqSpaceCompaction = false;
 
   /** Enable inner space compaction for unsequence files */
   private boolean enableUnseqSpaceCompaction = true;
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index e62c393878..5c1e65ad04 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -104,6 +104,16 @@
             <artifactId>iotdb-antlr</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+            <version>1.23.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.tukaani</groupId>
+            <artifactId>xz</artifactId>
+            <version>1.9</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
index 84654e2a00..217941da7a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.tsfile.compress;
 
 import org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedException;
 import org.apache.iotdb.tsfile.exception.compress.GZIPCompressOverflowException;
+import org.apache.iotdb.tsfile.exception.compress.LZMACompressOverflowException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
 import com.github.luben.zstd.Zstd;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
+import org.apache.commons.compress.compressors.lzma.LZMACompressorOutputStream;
 import org.xerial.snappy.Snappy;
 
 import java.io.ByteArrayInputStream;
@@ -38,6 +40,7 @@ import java.util.zip.GZIPOutputStream;
 
 import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.GZIP;
 import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.LZ4;
+import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.LZMA;
 import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.SNAPPY;
 import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.ZSTD;
 
@@ -69,6 +72,8 @@ public interface ICompressor extends Serializable {
         return new GZIPCompressor();
       case ZSTD:
         return new ZstdCompressor();
+      case LZMA:
+        return new LzmaCompression();
       default:
         throw new CompressionTypeNotSupportedException(name.toString());
     }
@@ -370,4 +375,67 @@ public interface ICompressor extends Serializable {
       return ZSTD;
     }
   }
+
+  class LzmaCompression implements ICompressor {
+
+    @Override
+    public byte[] compress(byte[] data) throws IOException {
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+      LZMACompressorOutputStream lzmaOutputStream = new LZMACompressorOutputStream(outputStream);
+
+      byte[] buffer = new byte[1024];
+      int bytesRead;
+      while ((bytesRead = inputStream.read(buffer)) != -1) {
+        lzmaOutputStream.write(buffer, 0, bytesRead);
+      }
+
+      lzmaOutputStream.close();
+      outputStream.close();
+      inputStream.close();
+      return outputStream.toByteArray();
+    }
+
+    @Override
+    public byte[] compress(byte[] data, int offset, int length) throws IOException {
+      byte[] dataBefore = new byte[length];
+      System.arraycopy(data, offset, dataBefore, 0, length);
+      return this.compress(dataBefore);
+    }
+
+    @Override
+    public int compress(byte[] data, int offset, int length, byte[] compressed) throws IOException {
+      byte[] dataBefore = new byte[length];
+      System.arraycopy(data, offset, dataBefore, 0, length);
+      byte[] res = this.compress(dataBefore);
+      if (res.length > compressed.length) {
+        throw new LZMACompressOverflowException();
+      }
+      System.arraycopy(res, 0, compressed, 0, res.length);
+      return res.length;
+    }
+
+    @Override
+    public int compress(ByteBuffer data, ByteBuffer compressed) throws IOException {
+      int length = data.remaining();
+      byte[] dataBefore = new byte[length];
+      data.get(dataBefore, 0, length);
+      byte[] res = this.compress(dataBefore);
+      if (res.length > compressed.capacity()) {
+        throw new LZMACompressOverflowException();
+      }
+      compressed.put(res);
+      return res.length;
+    }
+
+    @Override
+    public int getMaxBytesForCompression(int uncompressedDataSize) {
+      return uncompressedDataSize + 1024 * 4;
+    }
+
+    @Override
+    public CompressionType getType() {
+      return LZMA;
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
index d12985d759..d583eff2cf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
@@ -26,10 +26,13 @@ import com.github.luben.zstd.Zstd;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.commons.compress.compressors.lzma.LZMACompressorInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -356,4 +359,63 @@ public interface IUnCompressor {
       return CompressionType.ZSTD;
     }
   }
+
+  class LzmaUnCompressor implements IUnCompressor {
+
+    @Override
+    public int getUncompressedLength(byte[] array, int offset, int length) throws IOException {
+      throw new UnsupportedOperationException("unsupported get uncompress length");
+    }
+
+    @Override
+    public int getUncompressedLength(ByteBuffer buffer) throws IOException {
+      throw new UnsupportedOperationException("unsupported get uncompress length");
+    }
+
+    @Override
+    public byte[] uncompress(byte[] byteArray) throws IOException {
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray);
+      LZMACompressorInputStream lzmaInputStream = new LZMACompressorInputStream(inputStream);
+      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+      byte[] buffer = new byte[1024];
+      int bytesRead;
+      while ((bytesRead = lzmaInputStream.read(buffer)) != -1) {
+        outputStream.write(buffer, 0, bytesRead);
+      }
+
+      lzmaInputStream.close();
+      outputStream.close();
+      inputStream.close();
+
+      return outputStream.toByteArray();
+    }
+
+    @Override
+    public int uncompress(byte[] byteArray, int offset, int length, byte[] output, int outOffset)
+        throws IOException {
+      byte[] dataBefore = new byte[length];
+      System.arraycopy(byteArray, offset, dataBefore, 0, length);
+      byte[] res = this.uncompress(dataBefore);
+      System.arraycopy(res, 0, output, outOffset, res.length);
+      return res.length;
+    }
+
+    @Override
+    public int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
+      int length = compressed.remaining();
+      byte[] dataBefore = new byte[length];
+      compressed.get(dataBefore, 0, length);
+
+      byte[] res = this.uncompress(dataBefore);
+      uncompressed.put(res);
+
+      return res.length;
+    }
+
+    @Override
+    public CompressionType getCodecName() {
+      return CompressionType.LZMA;
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/compress/LZMACompressOverflowException.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/compress/LZMACompressOverflowException.java
new file mode 100644
index 0000000000..16cf245f5e
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/compress/LZMACompressOverflowException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.exception.compress;
+
+public class LZMACompressOverflowException extends RuntimeException {
+  public LZMACompressOverflowException() {
+    super("compressed data is larger than the given byte container.");
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
index c78a84508a..c06cf94171 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
@@ -32,7 +32,8 @@ public enum CompressionType {
   // NOTICE: To ensure the compatibility of existing files, do not change the byte LZ4 binds to.
   LZ4(".lz4", (byte) 7),
   /** ZSTD */
-  ZSTD(".zstd", (byte) 8);
+  ZSTD(".zstd", (byte) 8),
+  LZMA(".lzma", (byte) 9);
 
   private final String extensionName;
   private final byte index;
@@ -60,6 +61,8 @@ public enum CompressionType {
         return CompressionType.LZ4;
       case 8:
         return CompressionType.ZSTD;
+      case 9:
+        return CompressionType.LZMA;
       default:
         throw new IllegalArgumentException("Invalid input: " + compressor);
     }