You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/11/26 07:27:29 UTC
[camel] branch main updated: CAMEL-18762: camel-hdfs - Add LZ4 and Zstd compression support (#8780)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new e21fa754461 CAMEL-18762: camel-hdfs - Add LZ4 and Zstd compression support (#8780)
e21fa754461 is described below
commit e21fa754461721a5f05b1f66b439ce58318eaf5a
Author: Kengo Seki <se...@apache.org>
AuthorDate: Sat Nov 26 16:27:23 2022 +0900
CAMEL-18762: camel-hdfs - Add LZ4 and Zstd compression support (#8780)
---
components/camel-hdfs/pom.xml | 6 ++
.../camel/component/hdfs/HdfsCompressionCodec.java | 16 +++++
.../camel/component/hdfs/HdfsProducerTest.java | 77 ++++++++++++++++++++++
3 files changed, 99 insertions(+)
diff --git a/components/camel-hdfs/pom.xml b/components/camel-hdfs/pom.xml
index 0e87f35feab..944f3456ecf 100644
--- a/components/camel-hdfs/pom.xml
+++ b/components/camel-hdfs/pom.xml
@@ -144,6 +144,12 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ <version>1.8.0</version>
+ <scope>test</scope>
+ </dependency>
<!-- test infra -->
<dependency>
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
index a4d628f8c2b..5a22d3f6ff9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
@@ -20,7 +20,9 @@ import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.ZStandardCodec;
public enum HdfsCompressionCodec {
@@ -50,6 +52,20 @@ public enum HdfsCompressionCodec {
public CompressionCodec getCodec() {
return new SnappyCodec();
}
+ },
+
+ LZ4 {
+ @Override
+ public CompressionCodec getCodec() {
+ return new Lz4Codec();
+ }
+ },
+
+ ZSTANDARD {
+ @Override
+ public CompressionCodec getCodec() {
+ return new ZStandardCodec();
+ }
};
public abstract CompressionCodec getCodec();
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index 577ac61b949..6dcb7609824 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -24,6 +24,7 @@ import java.net.URL;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.util.IOHelper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -47,11 +48,14 @@ import org.junit.jupiter.api.Test;
import static org.apache.camel.language.simple.SimpleLanguage.simple;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
public class HdfsProducerTest extends HdfsTestSupport {
private static final Path TEMP_DIR = new Path(new File("target/test/").getAbsolutePath());
+ private static final boolean LD_LIBRARY_PATH_DEFINED = StringUtils.isNotBlank(System.getenv("LD_LIBRARY_PATH"));
+
@Override
@BeforeEach
public void setUp() throws Exception {
@@ -323,6 +327,24 @@ public class HdfsProducerTest extends HdfsTestSupport {
}
}
+ @Test
+ public void testCompressWithGZip() throws Exception {
+ assumeTrue(LD_LIBRARY_PATH_DEFINED);
+ byte aByte = 8;
+ template.sendBody("direct:gzip", aByte);
+
+ Configuration conf = new Configuration();
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-gzip");
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ reader.next(key, value);
+ byte rByte = ((ByteWritable) value).get();
+ assertEquals(rByte, aByte);
+
+ IOHelper.close(reader);
+ }
+
@Test
public void testCompressWithBZip2() throws Exception {
byte aByte = 8;
@@ -357,6 +379,41 @@ public class HdfsProducerTest extends HdfsTestSupport {
IOHelper.close(reader);
}
+ @Test
+ public void testCompressWithLz4() throws Exception {
+ byte aByte = 8;
+ template.sendBody("direct:lz4", aByte);
+
+ Configuration conf = new Configuration();
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-lz4");
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ reader.next(key, value);
+ byte rByte = ((ByteWritable) value).get();
+ assertEquals(rByte, aByte);
+
+ IOHelper.close(reader);
+ }
+
+ @Test
+ public void testCompressWithZStandard() throws Exception {
+ assumeTrue(LD_LIBRARY_PATH_DEFINED);
+ byte aByte = 8;
+ template.sendBody("direct:zstandard", aByte);
+
+ Configuration conf = new Configuration();
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-zstandard");
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ reader.next(key, value);
+ byte rByte = ((ByteWritable) value).get();
+ assertEquals(rByte, aByte);
+
+ IOHelper.close(reader);
+ }
+
@Override
@AfterEach
public void tearDown() throws Exception {
@@ -432,6 +489,26 @@ public class HdfsProducerTest extends HdfsTestSupport {
from("direct:snappy")
.to("hdfs:localhost/" + TEMP_DIR.toUri()
+ "/test-camel-snappy?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=SNAPPY&compressionType=BLOCK");
+
+ from("direct:lz4")
+ .to("hdfs:localhost/" + TEMP_DIR.toUri()
+ + "/test-camel-lz4?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=LZ4&compressionType=BLOCK");
+
+ // GZip and ZStandard requires native hadoop library. To run these tests,
+ // 1. install shared libraries for these codecs (e.g., libz.so and libzstd.so on Linux)
+ // 2. download pre-built native hadoop library, or build it yourself in accordance with
+ // https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/NativeLibraries.html
+ // 3. set LD_LIBRARY_PATH to point native hadoop library when running tests, like
+ // `$ LD_LIBRARY_PATH=/path/to/hadoop/lib/native ./mvnw clean test -f components/camel-hdfs`
+ if (LD_LIBRARY_PATH_DEFINED) {
+ from("direct:gzip")
+ .to("hdfs:localhost/" + TEMP_DIR.toUri()
+ + "/test-camel-gzip?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=GZIP&compressionType=BLOCK");
+
+ from("direct:zstandard")
+ .to("hdfs:localhost/" + TEMP_DIR.toUri()
+ + "/test-camel-zstandard?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=ZSTANDARD&compressionType=BLOCK");
+ }
}
};
}