You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/03/29 04:00:34 UTC

[camel] branch camel-3.x updated: CAMEL-19214: camel-hdfs - Add ShortWritable support to the HDFS component (#9669)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new 6c762799853 CAMEL-19214: camel-hdfs - Add ShortWritable support to the HDFS component (#9669)
6c762799853 is described below

commit 6c762799853d3dbba5d4a14302624fab563b2f53
Author: Kengo Seki <se...@apache.org>
AuthorDate: Wed Mar 29 12:59:58 2023 +0900

    CAMEL-19214: camel-hdfs - Add ShortWritable support to the HDFS component (#9669)
    
    The HDFS component supports most of Hadoop Writables corresponding to
    Java primary types, but it only doesn't support ShortWritable for some
    reason. This PR adds that support to the HDFS component.
---
 .../camel-hdfs/src/main/docs/hdfs-component.adoc    |  1 +
 .../camel/component/hdfs/DefaultHdfsFile.java       |  3 +++
 .../camel/component/hdfs/HdfsWritableFactories.java | 20 ++++++++++++++++++++
 .../apache/camel/component/hdfs/WritableType.java   |  8 ++++++++
 .../camel/component/hdfs/HdfsProducerTest.java      | 21 +++++++++++++++++++++
 5 files changed, 53 insertions(+)

diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
index 0b1391805b7..7ee2e445340 100644
--- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc
+++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
@@ -75,6 +75,7 @@ include::partial$component-endpoint-headers.adoc[]
 * BYTE for writing a byte, the java Byte class is mapped into a BYTE
 * BYTES for writing a sequence of bytes. It maps the java ByteBuffer
 class
+* SHORT for writing java short
 * INT for writing java integer
 * FLOAT for writing java float
 * LONG for writing java long
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
index f414e5223e0..2b29253e26e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
@@ -93,6 +94,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
             writables.put(ByteBuffer.class, new HdfsWritableFactories.HdfsBytesWritableFactory());
             writables.put(Double.class, new HdfsWritableFactories.HdfsDoubleWritableFactory());
             writables.put(Float.class, new HdfsWritableFactories.HdfsFloatWritableFactory());
+            writables.put(Short.class, new HdfsWritableFactories.HdfsShortWritableFactory());
             writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory());
             writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory());
             writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory());
@@ -105,6 +107,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme
             readables.put(BytesWritable.class, new HdfsWritableFactories.HdfsBytesWritableFactory());
             readables.put(DoubleWritable.class, new HdfsWritableFactories.HdfsDoubleWritableFactory());
             readables.put(FloatWritable.class, new HdfsWritableFactories.HdfsFloatWritableFactory());
+            readables.put(ShortWritable.class, new HdfsWritableFactories.HdfsShortWritableFactory());
             readables.put(IntWritable.class, new HdfsWritableFactories.HdfsIntWritableFactory());
             readables.put(LongWritable.class, new HdfsWritableFactories.HdfsLongWritableFactory());
             readables.put(Text.class, new HdfsWritableFactories.HdfsTextWritableFactory());
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
index 7f2de0f4319..a53f0e1d80b 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
@@ -155,6 +156,25 @@ public class HdfsWritableFactories {
         }
     }
 
+    public static final class HdfsShortWritableFactory implements HdfsWritableFactory {
+
+        private static final int SIZE = 2;
+
+        @Override
+        public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) {
+            size.setValue(SIZE);
+            ShortWritable writable = new ShortWritable();
+            writable.set(typeConverter.convertTo(Short.class, value));
+            return writable;
+        }
+
+        @Override
+        public Object read(Writable writable, Holder<Integer> size) {
+            size.setValue(SIZE);
+            return ((ShortWritable) writable).get();
+        }
+    }
+
     public static final class HdfsIntWritableFactory implements HdfsWritableFactory {
 
         private static final int SIZE = 4;
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/WritableType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/WritableType.java
index ccd416e8f8d..35d9c8af017 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/WritableType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/WritableType.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -50,6 +51,13 @@ public enum WritableType {
         }
     },
 
+    SHORT {
+        @Override
+        public Class<ShortWritable> getWritableClass() {
+            return ShortWritable.class;
+        }
+    },
+
     INT {
         @Override
         public Class<IntWritable> getWritableClass() {
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index 6dcb7609824..bae67afd144 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -135,6 +136,23 @@ public class HdfsProducerTest extends HdfsTestSupport {
         IOHelper.close(reader);
     }
 
+    @Test
+    public void testWriteShort() throws Exception {
+        short aShort = 32767;
+        template.sendBody("direct:write_short", aShort);
+
+        Configuration conf = new Configuration();
+        Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-short");
+        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file1));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        reader.next(key, value);
+        short rShort = ((ShortWritable) value).get();
+        assertEquals(rShort, aShort);
+
+        IOHelper.close(reader);
+    }
+
     @Test
     public void testWriteInt() throws Exception {
         int anInt = 1234;
@@ -443,6 +461,9 @@ public class HdfsProducerTest extends HdfsTestSupport {
                 from("direct:write_byte").to("hdfs:localhost/" + TEMP_DIR.toUri()
                                              + "/test-camel-byte?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE");
 
+                from("direct:write_short").to("hdfs:localhost/" + TEMP_DIR.toUri()
+                                              + "/test-camel-short?fileSystemType=LOCAL&valueType=SHORT&fileType=SEQUENCE_FILE");
+
                 from("direct:write_int").to("hdfs:localhost/" + TEMP_DIR.toUri()
                                             + "/test-camel-int?fileSystemType=LOCAL&valueType=INT&fileType=SEQUENCE_FILE");