You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/03/31 16:09:07 UTC
[iotdb] 01/01: add LZMA compression
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch add-lzma-compress
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit acd534a5400f0d199eaa0f945da81c2312d4d309
Author: LiuXuxin <li...@outlook.com>
AuthorDate: Sat Apr 1 00:08:36 2023 +0800
add LZMA compression
---
.../main/java/org/apache/iotdb/SessionExample.java | 121 ++++++---------------
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
tsfile/pom.xml | 10 ++
.../apache/iotdb/tsfile/compress/ICompressor.java | 68 ++++++++++++
.../iotdb/tsfile/compress/IUnCompressor.java | 62 +++++++++++
.../compress/LZMACompressOverflowException.java | 26 +++++
.../file/metadata/enums/CompressionType.java | 5 +-
7 files changed, 203 insertions(+), 91 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 953b7fd1da..1f9b80521e 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.Version;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.template.MeasurementNode;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -70,51 +69,8 @@ public class SessionExample {
.version(Version.V_1_0)
.build();
session.open(false);
-
- // set session fetchSize
- session.setFetchSize(10000);
-
- try {
- session.createDatabase("root.sg1");
- } catch (StatementExecutionException e) {
- if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
- throw e;
- }
- }
-
- // createTemplate();
- createTimeseries();
- createMultiTimeseries();
- insertRecord();
insertTablet();
- // insertTabletWithNullValues();
- // insertTablets();
- // insertRecords();
- // insertText();
- // selectInto();
- // createAndDropContinuousQueries();
- // nonQuery();
- query();
- // queryWithTimeout();
- rawDataQuery();
- lastDataQuery();
- aggregationQuery();
- groupByQuery();
- // queryByIterator();
- // deleteData();
- // deleteTimeseries();
- // setTimeout();
-
- sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
- sessionEnableRedirect.setEnableQueryRedirection(true);
- sessionEnableRedirect.open(false);
-
- // set session fetchSize
- sessionEnableRedirect.setFetchSize(10000);
-
- insertRecord4Redirect();
- query4Redirect();
- sessionEnableRedirect.close();
+
session.close();
}
@@ -386,58 +342,45 @@ public class SessionExample {
*/
// The schema of measurements of one device
// only measurementId and data type in MeasurementSchema take effects in Tablet
+ long startTime = System.currentTimeMillis();
List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
- schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
-
- Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList, 100);
-
- // Method 1 to add tablet data
- long timestamp = System.currentTimeMillis();
-
- for (long row = 0; row < 100; row++) {
- int rowIndex = tablet.rowSize++;
- tablet.addTimestamp(rowIndex, timestamp);
- for (int s = 0; s < 3; s++) {
- long value = new Random().nextLong();
- tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
- tablet.reset();
+ for (int i = 1; i <= 100; i++) {
+ schemaList.add(
+ new MeasurementSchema(
+ "s" + i, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY));
+ }
+ Random random = new Random();
+ for (int i = 1; i <= 1000; i++) {
+ String device = "root.sg.d" + i;
+ Tablet tablet = new Tablet(device, schemaList, 100);
+ // Method 1 to add tablet data
+ long timestamp = System.currentTimeMillis();
+
+ for (long row = 0; row < 10000; row++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ for (int s = 0; s < 100; s++) {
+ long value = Math.abs(random.nextLong() % 1000);
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
+ }
+ if (tablet.rowSize == tablet.getMaxRowNumber()) {
+ session.insertTablet(tablet, true);
+ tablet.reset();
+ }
+ timestamp++;
}
- timestamp++;
- }
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
- // Method 2 to add tablet data
- long[] timestamps = tablet.timestamps;
- Object[] values = tablet.values;
-
- for (long time = 0; time < 100; time++) {
- int row = tablet.rowSize++;
- timestamps[row] = time;
- for (int i = 0; i < 3; i++) {
- long[] sensor = (long[]) values[i];
- sensor[row] = i;
- }
- if (tablet.rowSize == tablet.getMaxRowNumber()) {
- session.insertTablet(tablet, true);
+ if (tablet.rowSize != 0) {
+ session.insertTablet(tablet);
tablet.reset();
}
+ System.out.println(i);
}
-
- if (tablet.rowSize != 0) {
- session.insertTablet(tablet);
- tablet.reset();
- }
+ System.out.println("Time Cost: " + (System.currentTimeMillis() - startTime));
}
+ public static String getRandomString() {}
+
private static void insertTabletWithNullValues()
throws IoTDBConnectionException, StatementExecutionException {
/*
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b0f97c2288..cdc69fe2e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,7 +397,7 @@ public class IoTDBConfig {
private int avgSeriesPointNumberThreshold = 100000;
/** Enable inner space compaction for sequence files */
- private boolean enableSeqSpaceCompaction = true;
+ private boolean enableSeqSpaceCompaction = false;
/** Enable inner space compaction for unsequence files */
private boolean enableUnseqSpaceCompaction = true;
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index e62c393878..5c1e65ad04 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -104,6 +104,16 @@
<artifactId>iotdb-antlr</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.23.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.tukaani</groupId>
+ <artifactId>xz</artifactId>
+ <version>1.9</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
index 84654e2a00..217941da7a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.tsfile.compress;
import org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedException;
import org.apache.iotdb.tsfile.exception.compress.GZIPCompressOverflowException;
+import org.apache.iotdb.tsfile.exception.compress.LZMACompressOverflowException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import com.github.luben.zstd.Zstd;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
+import org.apache.commons.compress.compressors.lzma.LZMACompressorOutputStream;
import org.xerial.snappy.Snappy;
import java.io.ByteArrayInputStream;
@@ -38,6 +40,7 @@ import java.util.zip.GZIPOutputStream;
import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.GZIP;
import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.LZ4;
+import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.LZMA;
import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.SNAPPY;
import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.ZSTD;
@@ -69,6 +72,8 @@ public interface ICompressor extends Serializable {
return new GZIPCompressor();
case ZSTD:
return new ZstdCompressor();
+ case LZMA:
+ return new LzmaCompression();
default:
throw new CompressionTypeNotSupportedException(name.toString());
}
@@ -370,4 +375,67 @@ public interface ICompressor extends Serializable {
return ZSTD;
}
}
+
+ class LzmaCompression implements ICompressor {
+
+ @Override
+ public byte[] compress(byte[] data) throws IOException {
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ LZMACompressorOutputStream lzmaOutputStream = new LZMACompressorOutputStream(outputStream);
+
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = inputStream.read(buffer)) != -1) {
+ lzmaOutputStream.write(buffer, 0, bytesRead);
+ }
+
+ lzmaOutputStream.close();
+ outputStream.close();
+ inputStream.close();
+ return outputStream.toByteArray();
+ }
+
+ @Override
+ public byte[] compress(byte[] data, int offset, int length) throws IOException {
+ byte[] dataBefore = new byte[length];
+ System.arraycopy(data, offset, dataBefore, 0, length);
+ return this.compress(dataBefore);
+ }
+
+ @Override
+ public int compress(byte[] data, int offset, int length, byte[] compressed) throws IOException {
+ byte[] dataBefore = new byte[length];
+ System.arraycopy(data, offset, dataBefore, 0, length);
+ byte[] res = this.compress(dataBefore);
+ if (res.length > compressed.length) {
+ throw new LZMACompressOverflowException();
+ }
+ System.arraycopy(res, 0, compressed, 0, res.length);
+ return res.length;
+ }
+
+ @Override
+ public int compress(ByteBuffer data, ByteBuffer compressed) throws IOException {
+ int length = data.remaining();
+ byte[] dataBefore = new byte[length];
+ data.get(dataBefore, 0, length);
+ byte[] res = this.compress(dataBefore);
+ if (res.length > compressed.capacity()) {
+ throw new LZMACompressOverflowException();
+ }
+ compressed.put(res);
+ return res.length;
+ }
+
+ @Override
+ public int getMaxBytesForCompression(int uncompressedDataSize) {
+ return uncompressedDataSize + 1024 * 4;
+ }
+
+ @Override
+ public CompressionType getType() {
+ return LZMA;
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
index d12985d759..d583eff2cf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
@@ -26,10 +26,13 @@ import com.github.luben.zstd.Zstd;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.commons.compress.compressors.lzma.LZMACompressorInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -356,4 +359,63 @@ public interface IUnCompressor {
return CompressionType.ZSTD;
}
}
+
+ class LzmaUnCompressor implements IUnCompressor {
+
+ @Override
+ public int getUncompressedLength(byte[] array, int offset, int length) throws IOException {
+ throw new UnsupportedOperationException("unsupported get uncompress length");
+ }
+
+ @Override
+ public int getUncompressedLength(ByteBuffer buffer) throws IOException {
+ throw new UnsupportedOperationException("unsupported get uncompress length");
+ }
+
+ @Override
+ public byte[] uncompress(byte[] byteArray) throws IOException {
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray);
+ LZMACompressorInputStream lzmaInputStream = new LZMACompressorInputStream(inputStream);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while ((bytesRead = lzmaInputStream.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+
+ lzmaInputStream.close();
+ outputStream.close();
+ inputStream.close();
+
+ return outputStream.toByteArray();
+ }
+
+ @Override
+ public int uncompress(byte[] byteArray, int offset, int length, byte[] output, int outOffset)
+ throws IOException {
+ byte[] dataBefore = new byte[length];
+ System.arraycopy(byteArray, offset, dataBefore, 0, length);
+ byte[] res = this.uncompress(dataBefore);
+ System.arraycopy(res, 0, output, outOffset, res.length);
+ return res.length;
+ }
+
+ @Override
+ public int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
+ int length = compressed.remaining();
+ byte[] dataBefore = new byte[length];
+ compressed.get(dataBefore, 0, length);
+
+ byte[] res = this.uncompress(dataBefore);
+ uncompressed.put(res);
+
+ return res.length;
+ }
+
+ @Override
+ public CompressionType getCodecName() {
+ return CompressionType.LZMA;
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/compress/LZMACompressOverflowException.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/compress/LZMACompressOverflowException.java
new file mode 100644
index 0000000000..16cf245f5e
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/exception/compress/LZMACompressOverflowException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.exception.compress;
+
+public class LZMACompressOverflowException extends RuntimeException {
+ public LZMACompressOverflowException() {
+ super("compressed data is larger than the given byte container.");
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
index c78a84508a..c06cf94171 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
@@ -32,7 +32,8 @@ public enum CompressionType {
// NOTICE: To ensure the compatibility of existing files, do not change the byte LZ4 binds to.
LZ4(".lz4", (byte) 7),
/** ZSTD */
- ZSTD(".zstd", (byte) 8);
+ ZSTD(".zstd", (byte) 8),
+ LZMA(".lzma", (byte) 9);
private final String extensionName;
private final byte index;
@@ -60,6 +61,8 @@ public enum CompressionType {
return CompressionType.LZ4;
case 8:
return CompressionType.ZSTD;
+ case 9:
+ return CompressionType.LZMA;
default:
throw new IllegalArgumentException("Invalid input: " + compressor);
}