You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/11/24 06:39:20 UTC
[camel] branch main updated: CAMEL-18749: camel-hdfs: Add Snappy compression support (#8767)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 48c1b4dc270 CAMEL-18749: camel-hdfs: Add Snappy compression support (#8767)
48c1b4dc270 is described below
commit 48c1b4dc2701cb3c94c2123bc81b79526215d55b
Author: Kengo Seki <se...@apache.org>
AuthorDate: Thu Nov 24 15:39:14 2022 +0900
CAMEL-18749: camel-hdfs: Add Snappy compression support (#8767)
camel-hdfs currently supports gzip and bzip2 compression,
but HDFS itself supports some other codecs.
This PR adds snappy compression feature to the producer.
---
.../camel/component/hdfs/HdfsCompressionCodec.java | 8 ++++
.../camel/component/hdfs/HdfsProducerTest.java | 43 ++++++++++++++++++++++
2 files changed, 51 insertions(+)
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
index 1dbcef1fcdf..a4d628f8c2b 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsCompressionCodec.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.SnappyCodec;
public enum HdfsCompressionCodec {
@@ -42,6 +43,13 @@ public enum HdfsCompressionCodec {
public CompressionCodec getCodec() {
return new BZip2Codec();
}
+ },
+
+ SNAPPY {
+ @Override
+ public CompressionCodec getCodec() {
+ return new SnappyCodec();
+ }
};
public abstract CompressionCodec getCodec();
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index d055660fbd7..577ac61b949 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -323,6 +323,40 @@ public class HdfsProducerTest extends HdfsTestSupport {
}
}
+ @Test
+ public void testCompressWithBZip2() throws Exception {
+ byte aByte = 8;
+ template.sendBody("direct:bzip2", aByte);
+
+ Configuration conf = new Configuration();
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-bzip2");
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ reader.next(key, value);
+ byte rByte = ((ByteWritable) value).get();
+ assertEquals(rByte, aByte);
+
+ IOHelper.close(reader);
+ }
+
+ @Test
+ public void testCompressWithSnappy() throws Exception {
+ byte aByte = 8;
+ template.sendBody("direct:snappy", aByte);
+
+ Configuration conf = new Configuration();
+ Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-snappy");
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ reader.next(key, value);
+ byte rByte = ((ByteWritable) value).get();
+ assertEquals(rByte, aByte);
+
+ IOHelper.close(reader);
+ }
+
@Override
@AfterEach
public void tearDown() throws Exception {
@@ -389,6 +423,15 @@ public class HdfsProducerTest extends HdfsTestSupport {
from("direct:write_dynamic_filename")
.to("hdfs:localhost/" + TEMP_DIR.toUri() + "/test-camel-dynamic/?fileSystemType=LOCAL&valueType=TEXT");
+
+ /* For testing compression codecs */
+ from("direct:bzip2")
+ .to("hdfs:localhost/" + TEMP_DIR.toUri()
+ + "/test-camel-bzip2?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=BZIP2&compressionType=BLOCK");
+
+ from("direct:snappy")
+ .to("hdfs:localhost/" + TEMP_DIR.toUri()
+ + "/test-camel-snappy?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE&compressionCodec=SNAPPY&compressionType=BLOCK");
}
};
}