You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/11/26 07:27:29 UTC

[camel] branch main updated: CAMEL-18762: camel-hdfs - Add LZ4 and Zstd compression support (#8780)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new e21fa754461 CAMEL-18762: camel-hdfs - Add LZ4 and Zstd compression support (#8780)
e21fa754461 is described below

commit e21fa754461721a5f05b1f66b439ce58318eaf5a
Author: Kengo Seki <se...@apache.org>
AuthorDate: Sat Nov 26 16:27:23 2022 +0900

    CAMEL-18762: camel-hdfs - Add LZ4 and Zstd compression support (#8780)
---
 components/camel-hdfs/pom.xml                      |  6 ++
 .../camel/component/hdfs/HdfsCompressionCodec.java | 16 +++++
 .../camel/component/hdfs/HdfsProducerTest.java     | 77 ++++++++++++++++++++++
 3 files changed, 99 insertions(+)

diff --git a/components/camel-hdfs/pom.xml b/components/camel-hdfs/pom.xml
index 0e87f35feab..944f3456ecf 100644
--- a/components/camel-hdfs/pom.xml
+++ b/components/camel-hdfs/pom.xml
@@ -144,6 +144,12 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.lz4</groupId>
+            <artifactId>lz4-java</artifactId>
+            <version>1.8.0</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- test infra -->
         <dependency>
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
index a4d628f8c2b..5a22d3f6ff9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
@@ -20,7 +20,9 @@ import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
 import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.ZStandardCodec;
 
 public enum HdfsCompressionCodec {
 
@@ -50,6 +52,20 @@ public enum HdfsCompressionCodec {
         public CompressionCodec getCodec() {
             return new SnappyCodec();
         }
+    },
+
+    LZ4 {
+        @Override
+        public CompressionCodec getCodec() {
+            return new Lz4Codec();
+        }
+    },
+
+    ZSTANDARD {
+        @Override
+        public CompressionCodec getCodec() {
+            return new ZStandardCodec();
+        }
     };
 
     public abstract CompressionCodec getCodec();
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index 577ac61b949..6dcb7609824 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -24,6 +24,7 @@ import java.net.URL;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.util.IOHelper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -47,11 +48,14 @@ import org.junit.jupiter.api.Test;
 
 import static org.apache.camel.language.simple.SimpleLanguage.simple;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 public class HdfsProducerTest extends HdfsTestSupport {
 
     private static final Path TEMP_DIR = new Path(new File("target/test/").getAbsolutePath());
 
+    private static final boolean LD_LIBRARY_PATH_DEFINED = StringUtils.isNotBlank(System.getenv("LD_LIBRARY_PATH"));
+
     @Override
     @BeforeEach
     public void setUp() throws Exception {
@@ -323,6 +327,24 @@ public class HdfsProducerTest extends HdfsTestSupport {
         }
     }
 
+    @Test
+    public void testCompressWithGZip() throws Exception {
+        assumeTrue(LD_LIBRARY_PATH_DEFINED);
+        byte aByte = 8;
+        template.sendBody("direct:gzip", aByte);
+
+        Configuration conf = new Configuration();
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-gzip");
+        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        reader.next(key, value);
+        byte rByte = ((ByteWritable) value).get();
+        assertEquals(rByte, aByte);
+
+        IOHelper.close(reader);
+    }
+
     @Test
     public void testCompressWithBZip2() throws Exception {
         byte aByte = 8;
@@ -357,6 +379,41 @@ public class HdfsProducerTest extends HdfsTestSupport {
         IOHelper.close(reader);
     }
 
+    @Test
+    public void testCompressWithLz4() throws Exception {
+        byte aByte = 8;
+        template.sendBody("direct:lz4", aByte);
+
+        Configuration conf = new Configuration();
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-lz4");
+        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        reader.next(key, value);
+        byte rByte = ((ByteWritable) value).get();
+        assertEquals(rByte, aByte);
+
+        IOHelper.close(reader);
+    }
+
+    @Test
+    public void testCompressWithZStandard() throws Exception {
+        assumeTrue(LD_LIBRARY_PATH_DEFINED);
+        byte aByte = 8;
+        template.sendBody("direct:zstandard", aByte);
+
+        Configuration conf = new Configuration();
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-zstandard");
+        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        reader.next(key, value);
+        byte rByte = ((ByteWritable) value).get();
+        assertEquals(rByte, aByte);
+
+        IOHelper.close(reader);
+    }
+
     @Override
     @AfterEach
     public void tearDown() throws Exception {
@@ -432,6 +489,26 @@ public class HdfsProducerTest extends HdfsTestSupport {
                 from("direct:snappy")
                         .to("hdfs:localhost/" + TEMP_DIR.toUri()
                             + "/test-camel-snappy?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=SNAPPY&compressionType=BLOCK");
+
+                from("direct:lz4")
+                        .to("hdfs:localhost/" + TEMP_DIR.toUri()
+                            + "/test-camel-lz4?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=LZ4&compressionType=BLOCK");
+
+                // GZip and ZStandard requires native hadoop library. To run these tests,
+                // 1. install shared libraries for these codecs (e.g., libz.so and libzstd.so on Linux)
+                // 2. download pre-built native hadoop library, or build it yourself in accordance with
+                //    https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/NativeLibraries.html
+                // 3. set LD_LIBRARY_PATH to point native hadoop library when running tests, like
+                //    `$ LD_LIBRARY_PATH=/path/to/hadoop/lib/native ./mvnw clean test -f components/camel-hdfs`
+                if (LD_LIBRARY_PATH_DEFINED) {
+                    from("direct:gzip")
+                            .to("hdfs:localhost/" + TEMP_DIR.toUri()
+                                + "/test-camel-gzip?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=GZIP&compressionType=BLOCK");
+
+                    from("direct:zstandard")
+                            .to("hdfs:localhost/" + TEMP_DIR.toUri()
+                                + "/test-camel-zstandard?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=ZSTANDARD&compressionType=BLOCK");
+                }
             }
         };
     }