You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/04/07 03:27:42 UTC
[incubator-inlong] branch master updated: [INLONG-3523][Sort] Remove lzo-core dependency (#3559)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9faf490d9 [INLONG-3523][Sort] Remove lzo-core dependency (#3559)
9faf490d9 is described below
commit 9faf490d93715ab86617c7a7423e2e76a3aa6078
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Thu Apr 7 11:27:38 2022 +0800
[INLONG-3523][Sort] Remove lzo-core dependency (#3559)
Co-authored-by: tianqiwan <ti...@tencent.com>
---
inlong-sort/pom.xml | 7 -----
inlong-sort/sort-connectors/pom.xml | 5 ----
.../sort/flink/hive/formats/TextRowWriter.java | 8 +-----
.../sort/flink/hive/formats/TextRowWriterTest.java | 33 ----------------------
4 files changed, 1 insertion(+), 52 deletions(-)
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 498702c5d..3871c74e9 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -61,7 +61,6 @@
<hive.version>2.3.8</hive.version>
<pulsar.version>2.8.1</pulsar.version>
<kafka.version>2.4.1</kafka.version>
- <lzo.core.version>1.0.6</lzo.core.version>
</properties>
<modules>
@@ -410,12 +409,6 @@
<version>${clickhouse-jdbc.version}</version>
</dependency>
- <dependency>
- <groupId>org.anarres.lzo</groupId>
- <artifactId>lzo-core</artifactId>
- <version>${lzo.core.version}</version>
- </dependency>
-
</dependencies>
</dependencyManagement>
diff --git a/inlong-sort/sort-connectors/pom.xml b/inlong-sort/sort-connectors/pom.xml
index 02dd09871..1fda70f4d 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -88,11 +88,6 @@
<artifactId>hadoop-common</artifactId>
</dependency>
- <dependency>
- <groupId>org.anarres.lzo</groupId>
- <artifactId>lzo-core</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriter.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriter.java
index 70e975cda..877c12961 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriter.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriter.java
@@ -26,10 +26,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.zip.GZIPOutputStream;
-import org.anarres.lzo.LzoAlgorithm;
-import org.anarres.lzo.LzoCompressor;
-import org.anarres.lzo.LzoLibrary;
-import org.anarres.lzo.LzopOutputStream;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.shaded.guava18.com.google.common.annotations.VisibleForTesting;
@@ -295,9 +291,7 @@ public class TextRowWriter implements BulkWriter<Row> {
case GZIP:
return new GZIPOutputStream(outputStream, bufferSize, false);
case LZO:
- LzoAlgorithm algorithm = LzoAlgorithm.LZO1X;
- LzoCompressor compressor = LzoLibrary.getInstance().newCompressor(algorithm, null);
- return new LzopOutputStream(outputStream, compressor, bufferSize);
+ throw new IllegalArgumentException("LZO compression is not supported yet!");
default:
// TODO, should be wrapped with a buffered stream? we need a performance testing
return outputStream;
diff --git a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriterTest.java b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriterTest.java
index 71d09d74b..3b35a1a4a 100644
--- a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriterTest.java
+++ b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/TextRowWriterTest.java
@@ -20,7 +20,6 @@ package org.apache.inlong.sort.flink.hive.formats;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import io.airlift.compress.lzo.LzopCodec;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -42,7 +41,6 @@ import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.types.Row;
-import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants.CompressionType;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
@@ -102,37 +100,6 @@ public class TextRowWriterTest {
assertTrue(isSameFile(gzipFile.getAbsolutePath(), "src/test/resources/testGzip.gz"));
}
- @Test
- public void testWriteLZO() throws IOException {
- File lzoFile = temporaryFolder.newFile("test.lzo");
- TextRowWriter textRowWriter = new TextRowWriter(
- new LocalDataOutputStream(lzoFile),
- new TextFileFormat(',', CompressionType.LZO),
- new Configuration(),
- new LogicalType[] {new CharType(), new IntType()}
- );
-
- textRowWriter.addElement(Row.of("zhangsan", 1));
- textRowWriter.addElement(Row.of("lisi", 2));
- textRowWriter.finish();
-
- LzopCodec lzopCodec = new LzopCodec();
- CompressionInputStream compressionInputStream = lzopCodec.createInputStream(
- new FileInputStream(lzoFile.getAbsolutePath()));
- final List<String> results = new ArrayList<>();
- try (BufferedReader br = new BufferedReader(new InputStreamReader(
- compressionInputStream, StandardCharsets.UTF_8))) {
- String line;
- while ((line = br.readLine()) != null) {
- results.add(line);
- }
- }
-
- assertEquals(2, results.size());
- assertEquals("zhangsan,1", results.get(0));
- assertEquals("lisi,2", results.get(1));
- }
-
public static boolean isSameFile(String fileName1, String fileName2) {
try (FileInputStream fis1 = new FileInputStream(fileName1);
FileInputStream fis2 = new FileInputStream(fileName2)) {