You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/11/24 06:39:20 UTC

[camel] branch main updated: CAMEL-18749: camel-hdfs: Add Snappy compression support (#8767)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 48c1b4dc270 CAMEL-18749: camel-hdfs: Add Snappy compression support (#8767)
48c1b4dc270 is described below

commit 48c1b4dc2701cb3c94c2123bc81b79526215d55b
Author: Kengo Seki <se...@apache.org>
AuthorDate: Thu Nov 24 15:39:14 2022 +0900

    CAMEL-18749: camel-hdfs: Add Snappy compression support (#8767)
    
    camel-hdfs currently supports gzip and bzip2 compression,
    but HDFS itself supports some other codecs.
    This PR adds snappy compression feature to the producer.
---
 .../camel/component/hdfs/HdfsCompressionCodec.java |  8 ++++
 .../camel/component/hdfs/HdfsProducerTest.java     | 43 ++++++++++++++++++++++
 2 files changed, 51 insertions(+)

diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
index 1dbcef1fcdf..a4d628f8c2b 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.SnappyCodec;
 
 public enum HdfsCompressionCodec {
 
@@ -42,6 +43,13 @@ public enum HdfsCompressionCodec {
         public CompressionCodec getCodec() {
             return new BZip2Codec();
         }
+    },
+
+    SNAPPY {
+        @Override
+        public CompressionCodec getCodec() {
+            return new SnappyCodec();
+        }
     };
 
     public abstract CompressionCodec getCodec();
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index d055660fbd7..577ac61b949 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -323,6 +323,40 @@ public class HdfsProducerTest extends HdfsTestSupport {
         }
     }
 
+    @Test
+    public void testCompressWithBZip2() throws Exception {
+        byte aByte = 8;
+        template.sendBody("direct:bzip2", aByte);
+
+        Configuration conf = new Configuration();
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-bzip2");
+        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        reader.next(key, value);
+        byte rByte = ((ByteWritable) value).get();
+        assertEquals(rByte, aByte);
+
+        IOHelper.close(reader);
+    }
+
+    @Test
+    public void testCompressWithSnappy() throws Exception {
+        byte aByte = 8;
+        template.sendBody("direct:snappy", aByte);
+
+        Configuration conf = new Configuration();
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-snappy");
+        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        reader.next(key, value);
+        byte rByte = ((ByteWritable) value).get();
+        assertEquals(rByte, aByte);
+
+        IOHelper.close(reader);
+    }
+
     @Override
     @AfterEach
     public void tearDown() throws Exception {
@@ -389,6 +423,15 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
                 from("direct:write_dynamic_filename")
                         .to("hdfs:localhost/" + TEMP_DIR.toUri() + "/test-camel-dynamic/?fileSystemType=LOCAL&valueType=TEXT");
+
+                /* For testing compression codecs */
+                from("direct:bzip2")
+                        .to("hdfs:localhost/" + TEMP_DIR.toUri()
+                            + "/test-camel-bzip2?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=BZIP2&compressionType=BLOCK");
+
+                from("direct:snappy")
+                        .to("hdfs:localhost/" + TEMP_DIR.toUri()
+                            + "/test-camel-snappy?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=SNAPPY&compressionType=BLOCK");
             }
         };
     }