You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/06 11:48:04 UTC

[camel] 01/02: CAMEL-18792: camel-hdfs - Make integration tests runnable without a real cluster

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d27a54c4685af3f9486c34ed972bd8b80cb577f8
Author: Kengo Seki <se...@apache.org>
AuthorDate: Sat Dec 3 14:16:18 2022 +0900

    CAMEL-18792: camel-hdfs - Make integration tests runnable without a real cluster
---
 components/camel-hdfs/pom.xml                      |   6 +
 .../component/hdfs/integration/HdfsAppendIT.java   |  62 ++--
 .../integration/HdfsConsumerIntegrationIT.java     | 312 +++++++++------------
 ...java => HdfsProducerConsumerIntegrationIT.java} |  43 +--
 4 files changed, 196 insertions(+), 227 deletions(-)

diff --git a/components/camel-hdfs/pom.xml b/components/camel-hdfs/pom.xml
index 944f3456ecf..5732f2a2ea1 100644
--- a/components/camel-hdfs/pom.xml
+++ b/components/camel-hdfs/pom.xml
@@ -144,6 +144,12 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop3-version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.lz4</groupId>
             <artifactId>lz4-java</artifactId>
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendIT.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendIT.java
index 8d59650287e..2ece9214a94 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendIT.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendIT.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -44,6 +45,9 @@ public class HdfsAppendIT extends CamelTestSupport {
 
     private static final int ITERATIONS = 10;
 
+    private MiniDFSCluster cluster;
+    private Configuration conf;
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;
@@ -54,25 +58,25 @@ public class HdfsAppendIT extends CamelTestSupport {
     public void setUp() throws Exception {
         super.setUp();
 
-        Configuration conf = new Configuration();
+        conf = new Configuration();
         if (SystemUtils.IS_OS_MAC) {
             conf.addResource("hdfs-mac-test.xml");
         } else {
             conf.addResource("hdfs-test.xml");
         }
+        conf.set("dfs.namenode.fs-limits.max-directory-items", "1048576");
+        cluster = new MiniDFSCluster.Builder(conf).nameNodePort(9000).numDataNodes(3).format(true).build();
+        cluster.waitActive();
+
         String path = String.format("hdfs://%s:%d/tmp/test/test-camel-simple-write-file1", service.getHDFSHost(),
                 service.getPort());
-
         Path file = new Path(path);
         FileSystem fs = FileSystem.get(file.toUri(), conf);
-        if (fs.exists(file)) {
-            fs.delete(file, true);
-        }
-        FSDataOutputStream out = fs.create(file);
-        for (int i = 0; i < 10; ++i) {
-            out.write("PIPPO".getBytes("UTF-8"));
+        try (FSDataOutputStream out = fs.create(file)) {
+            for (int i = 0; i < 10; ++i) {
+                out.write("PIPPO".getBytes("UTF-8"));
+            }
         }
-        out.close();
     }
 
     @Test
@@ -91,21 +95,20 @@ public class HdfsAppendIT extends CamelTestSupport {
             template.sendBody("direct:start1", "PIPPQ");
         }
 
-        Configuration conf = new Configuration();
         String path = String.format("hdfs://%s:%d/tmp/test/test-camel-simple-write-file1", service.getHDFSHost(),
                 service.getPort());
         Path file = new Path(path);
         FileSystem fs = FileSystem.get(file.toUri(), conf);
-        FSDataInputStream in = fs.open(file);
-        byte[] buffer = new byte[5];
         int ret = 0;
-        for (int i = 0; i < 20; ++i) {
+        try (FSDataInputStream in = fs.open(file)) {
+            byte[] buffer = new byte[5];
+            for (int i = 0; i < 20; ++i) {
+                assertEquals(5, in.read(buffer));
+                LOG.info("> {}", new String(buffer));
+            }
             ret = in.read(buffer);
-            LOG.info("> {}", new String(buffer));
         }
-        ret = in.read(buffer);
         assertEquals(-1, ret);
-        in.close();
     }
 
     @Test
@@ -124,34 +127,29 @@ public class HdfsAppendIT extends CamelTestSupport {
             template.sendBodyAndHeader("direct:start1", "HELLO", Exchange.FILE_NAME, "camel-hdfs.log");
         }
 
-        Configuration conf = new Configuration();
         String path = String.format("hdfs://%s:%d/tmp/test-dynamic/camel-hdfs.log", service.getHDFSHost(),
                 service.getPort());
 
         Path file = new Path(path);
         FileSystem fs = FileSystem.get(file.toUri(), conf);
-        FSDataInputStream in = fs.open(file);
-        byte[] buffer = new byte[5];
-        for (int i = 0; i < ITERATIONS; ++i) {
-            assertEquals(5, in.read(buffer));
-            LOG.info("> {}", new String(buffer));
+        int ret = 0;
+        try (FSDataInputStream in = fs.open(file)) {
+            byte[] buffer = new byte[5];
+            for (int i = 0; i < ITERATIONS; ++i) {
+                assertEquals(5, in.read(buffer));
+                LOG.info("> {}", new String(buffer));
+            }
+            ret = in.read(buffer);
         }
-        int ret = in.read(buffer);
         assertEquals(-1, ret);
-        in.close();
     }
 
     @Override
     @AfterEach
     public void tearDown() throws Exception {
         super.tearDown();
-
-        Thread.sleep(250);
-        Configuration conf = new Configuration();
-        Path dir = new Path(String.format("hdfs://%s:%d/tmp/test", service.getHDFSHost(), service.getPort()));
-        FileSystem fs = FileSystem.get(dir.toUri(), conf);
-        fs.delete(dir, true);
-        dir = new Path(String.format("hdfs://%s:%d/tmp/test-dynamic", service.getHDFSHost(), service.getPort()));
-        fs.delete(dir, true);
+        if (cluster != null) {
+            cluster.shutdown();
+        }
     }
 }
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationIT.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationIT.java
index 2907957892c..52dea7acb22 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationIT.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationIT.java
@@ -17,10 +17,7 @@
 package org.apache.camel.component.hdfs.integration;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -37,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.ArrayFile;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
@@ -49,20 +47,21 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.Progressable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.apache.camel.test.junit5.TestSupport.deleteDirectory;
-import static org.apache.hadoop.io.SequenceFile.CompressionType;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
     private static final int ITERATIONS = 200;
 
+    private MiniDFSCluster cluster;
+    private FileSystem fs;
+    private Configuration conf;
+    private Path dir;
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;
@@ -73,6 +72,15 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
     public void setUp() throws Exception {
         checkTest();
 
+        conf = new Configuration();
+        conf.set("dfs.namenode.fs-limits.max-directory-items", "1048576");
+        cluster = new MiniDFSCluster.Builder(conf).nameNodePort(9000).numDataNodes(3).format(true).build();
+        cluster.waitActive();
+
+        dir = new Path("hdfs://localhost:9000/tmp/test");
+        fs = FileSystem.get(dir.toUri(), conf);
+        fs.mkdirs(dir);
+
         // must be able to get security configuration
         try {
             javax.security.auth.login.Configuration.getConfiguration();
@@ -80,28 +88,25 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
             return;
         }
 
-        deleteDirectory("target/test");
         super.setUp();
     }
 
     @Test
     public void testSimpleConsumer() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
-        Configuration conf = new Configuration();
-        FileSystem fs = FileSystem.get(file.toUri(), conf);
-        FSDataOutputStream out = fs.create(file);
-        for (int i = 0; i < 1024; ++i) {
-            out.write(("PIPPO" + i).getBytes("UTF-8"));
-            out.flush();
+        final String file = "test-camel-normal-file";
+        try (FSDataOutputStream out = fs.create(new Path(dir, file))) {
+            for (int i = 0; i < 1024; ++i) {
+                out.write(("PIPPO" + i).getBytes("UTF-8"));
+                out.flush();
+            }
         }
-        out.close();
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(2);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&chunkSize=4096&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -112,13 +117,12 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testConcurrentConsumers() throws Exception {
-        final File rootdir = CWD;
-        final File dir = new File("target/test/multiple-consumers");
-        dir.mkdirs();
+        final Path dir = new Path(this.dir, "multiple-consumers");
+        fs.mkdirs(dir);
         for (int i = 1; i <= ITERATIONS; i++) {
-            FileOutputStream fos = new FileOutputStream(new File(dir, String.format("file-%04d.txt", i)));
-            fos.write(String.format("hello (%04d)\n", i).getBytes());
-            fos.close();
+            try (FSDataOutputStream fos = fs.create(new Path(dir, String.format("file-%04d.txt", i)))) {
+                fos.write(String.format("hello (%04d)\n", i).getBytes());
+            }
         }
 
         final Set<String> fileNames = new HashSet<>();
@@ -134,17 +138,17 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs://" + rootdir.toURI()
-                     + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=100&initialDelay=0")
+                from(dir.toUri()
+                     + "?pattern=*.txt&fileSystemType=HDFS&chunkSize=100&initialDelay=0")
                              .to("mock:result");
-                from("hdfs://" + rootdir.toURI()
-                     + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=200&initialDelay=0")
+                from(dir.toUri()
+                     + "?pattern=*.txt&fileSystemType=HDFS&chunkSize=200&initialDelay=0")
                              .to("mock:result");
-                from("hdfs://" + rootdir.toURI()
-                     + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=300&initialDelay=0")
+                from(dir.toUri()
+                     + "?pattern=*.txt&fileSystemType=HDFS&chunkSize=300&initialDelay=0")
                              .to("mock:result");
-                from("hdfs://" + rootdir.toURI()
-                     + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=400&initialDelay=0")
+                from(dir.toUri()
+                     + "pattern=*.txt&fileSystemType=HDFS&chunkSize=400&initialDelay=0")
                              .to("mock:result");
             }
         });
@@ -155,16 +159,13 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
         latch.await(30, TimeUnit.SECONDS);
 
         resultEndpoint.assertIsSatisfied();
-        assertThat(fileNames.size(), equalTo(ITERATIONS));
+        assertEquals(fileNames.size(), ITERATIONS);
     }
 
     @Test
     public void testSimpleConsumerWithEmptyFile() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
-        Configuration conf = new Configuration();
-        FileSystem fs = FileSystem.get(file.toUri(), conf);
-        FSDataOutputStream out = fs.create(file);
-        out.close();
+        final String file = "test-camel-normal-file";
+        fs.createNewFile(new Path(dir, file));
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         // TODO: See comment from Claus at ticket: https://issues.apache.org/jira/browse/CAMEL-8434
@@ -172,7 +173,7 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&chunkSize=4096&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -181,55 +182,54 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
         Thread.sleep(2000);
 
         resultEndpoint.assertIsSatisfied();
-        assertThat(
+        assertEquals(
                 resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length,
-                equalTo(0));
+                0);
     }
 
     @Test
     public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
-        Configuration conf = new Configuration();
-        FileSystem fs = FileSystem.get(file.toUri(), conf);
-        FSDataOutputStream out = fs.create(file);
-        // size = 5 times chunk size = 210 bytes
-        for (int i = 0; i < 42; ++i) {
-            out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 });
-            out.flush();
+        final String file = "test-camel-normal-file";
+        try (FSDataOutputStream out = fs.create(new Path(dir, file))) {
+            // size = 5 times chunk size = 210 bytes
+            for (int i = 0; i < 42; ++i) {
+                out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 });
+                out.flush();
+            }
         }
-        out.close();
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(5);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=42&initialDelay=0").to("mock:result");
+                fromF("%s?pattern=%s&fileSystemType=HDFS&chunkSize=42&initialDelay=0", dir.toUri(), file)
+                        .to("mock:result");
             }
         });
         context.start();
 
         resultEndpoint.assertIsSatisfied();
-        assertThat(
+        assertEquals(
                 resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length,
-                equalTo(42));
+                42);
     }
 
     @Test
     public void testSimpleConsumerWithEmptySequenceFile() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-sequence-file").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
-        writer.sync();
-        writer.close();
+        final String file = "test-camel-sequence-file";
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, BooleanWritable.class)) {
+            writer.sync();
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(0);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri()
-                     + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0").to("mock:result");
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0", dir.toUri(),
+                        file)
+                                .to("mock:result");
             }
         });
         context.start();
@@ -239,23 +239,18 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadWithReadSuffix() throws Exception {
-        String[] beforeFiles = new File("target/test").list();
-        int before = beforeFiles != null ? beforeFiles.length : 0;
-
-        final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
+        final String file = "test-camel-boolean";
         NullWritable keyWritable = NullWritable.get();
-        BooleanWritable valueWritable = new BooleanWritable();
-        valueWritable.set(true);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        BooleanWritable valueWritable = new BooleanWritable(true);
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, BooleanWritable.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.getParent().toUri()
-                     + "?scheduler=#myScheduler&pattern=*&fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled")
+                from(dir.toUri()
+                     + "?scheduler=#myScheduler&pattern=*&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled")
                              .to("mock:result");
             }
         });
@@ -272,28 +267,23 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
         scheduler.getScheduledExecutorService().shutdown();
         scheduler.getScheduledExecutorService().awaitTermination(5000, TimeUnit.MILLISECONDS);
 
-        Set<String> files = new HashSet<>(Arrays.asList(new File("target/test").list()));
-        // there may be some leftover files before, so test that we only added 2 new files
-        assertThat(files.size() - before, equalTo(2));
-        assertTrue(files.remove("test-camel-boolean.handled"));
-        assertTrue(files.remove(".test-camel-boolean.handled.crc"));
+        assertEquals(fs.listStatus(dir).length, 1);
+        assertTrue(fs.delete(new Path(dir, file + ".handled")));
     }
 
     @Test
     public void testReadBoolean() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
+        final String file = "test-camel-boolean";
         NullWritable keyWritable = NullWritable.get();
-        BooleanWritable valueWritable = new BooleanWritable();
-        valueWritable.set(true);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        BooleanWritable valueWritable = new BooleanWritable(true);
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, BooleanWritable.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -306,16 +296,13 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadByte() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-byte").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, ByteWritable.class);
+        final String file = "test-camel-byte";
         NullWritable keyWritable = NullWritable.get();
-        ByteWritable valueWritable = new ByteWritable();
-        byte value = 3;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        ByteWritable valueWritable = new ByteWritable((byte) 3);
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, ByteWritable.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
@@ -323,7 +310,7 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -334,23 +321,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadFloat() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-float").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, FloatWritable.class);
+        final String file = "test-camel-float";
         NullWritable keyWritable = NullWritable.get();
-        FloatWritable valueWritable = new FloatWritable();
-        float value = 3.1415926535f;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        FloatWritable valueWritable = new FloatWritable(3.1415926535f);
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, FloatWritable.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -361,23 +345,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadDouble() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-double").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, DoubleWritable.class);
+        final String file = "test-camel-double";
         NullWritable keyWritable = NullWritable.get();
-        DoubleWritable valueWritable = new DoubleWritable();
-        double value = 3.1415926535;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        DoubleWritable valueWritable = new DoubleWritable(3.1415926535);
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, DoubleWritable.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -388,23 +369,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadInt() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-int").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, IntWritable.class);
+        final String file = "test-camel-int";
         NullWritable keyWritable = NullWritable.get();
-        IntWritable valueWritable = new IntWritable();
-        int value = 314159265;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        IntWritable valueWritable = new IntWritable(314159265);
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, IntWritable.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -415,23 +393,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadLong() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-long").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, LongWritable.class);
+        final String file = "test-camel-long";
         NullWritable keyWritable = NullWritable.get();
-        LongWritable valueWritable = new LongWritable();
-        long value = 31415926535L;
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        LongWritable valueWritable = new LongWritable(31415926535L);
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, LongWritable.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -442,23 +417,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadBytes() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-bytes").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BytesWritable.class);
+        final String file = "test-camel-bytes";
         NullWritable keyWritable = NullWritable.get();
-        BytesWritable valueWritable = new BytesWritable();
-        String value = "CIAO!";
-        valueWritable.set(value.getBytes(), 0, value.getBytes().length);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        BytesWritable valueWritable = new BytesWritable("CIAO!".getBytes());
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, BytesWritable.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -469,23 +441,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadString() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-string").getAbsolutePath());
-        Configuration conf = new Configuration();
-        SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, Text.class);
+        final String file = "test-camel-string";
         NullWritable keyWritable = NullWritable.get();
-        Text valueWritable = new Text();
-        String value = "CIAO!";
-        valueWritable.set(value);
-        writer.append(keyWritable, valueWritable);
-        writer.sync();
-        writer.close();
+        Text valueWritable = new Text("CIAO!");
+        try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, Text.class)) {
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -496,27 +465,18 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
 
     @Test
     public void testReadStringArrayFile() throws Exception {
-        final Path file = new Path(new File("target/test/test-camel-string").getAbsolutePath());
-        Configuration conf = new Configuration();
-        FileSystem fs1 = FileSystem.get(file.toUri(), conf);
-        ArrayFile.Writer writer = new ArrayFile.Writer(
-                conf, fs1, "target/test/test-camel-string1", Text.class, CompressionType.NONE, new Progressable() {
-                    @Override
-                    public void progress() {
-                    }
-                });
-        Text valueWritable = new Text();
-        String value = "CIAO!";
-        valueWritable.set(value);
-        writer.append(valueWritable);
-        writer.close();
+        final String file = "test-camel-string";
+        Text valueWritable = new Text("CIAO!");
+        try (ArrayFile.Writer writer = new ArrayFile.Writer(conf, fs, dir.toUri() + "/" + file, Text.class)) {
+            writer.append(valueWritable);
+        }
 
         MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
         resultEndpoint.expectedMessageCount(1);
 
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("hdfs:localhost/" + file.getParent().toUri() + "?fileSystemType=LOCAL&fileType=ARRAY_FILE&initialDelay=0")
+                fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=ARRAY_FILE&initialDelay=0", dir.toUri(), file)
                         .to("mock:result");
             }
         });
@@ -529,11 +489,9 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
     @AfterEach
     public void tearDown() throws Exception {
         super.tearDown();
-        Thread.sleep(100);
-        Configuration conf = new Configuration();
-        Path dir = new Path("target/test");
-        FileSystem fs = FileSystem.get(dir.toUri(), conf);
-        fs.delete(dir, true);
+        if (cluster != null) {
+            cluster.shutdown();
+        }
     }
 
     private Writer createWriter(
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationManualIT.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationIT.java
similarity index 83%
rename from components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationManualIT.java
rename to components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationIT.java
index b2351938838..bce92eb8021 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationManualIT.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationIT.java
@@ -31,23 +31,34 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-@Disabled("Must run manual")
-public class HdfsProducerConsumerIntegrationManualIT extends CamelTestSupport {
+public class HdfsProducerConsumerIntegrationIT extends CamelTestSupport {
     private static final int ITERATIONS = 400;
 
+    private MiniDFSCluster cluster;
+
     @Override
     public boolean isUseRouteBuilder() {
         return false;
     }
 
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        Configuration conf = new Configuration();
+        conf.set("dfs.namenode.fs-limits.max-directory-items", "1048576");
+        cluster = new MiniDFSCluster.Builder(conf).nameNodePort(9000).numDataNodes(3).format(true).build();
+        cluster.waitActive();
+    }
+
     @Test
     public void testSimpleSplitWriteRead() throws Exception {
         context.addRoutes(new RouteBuilder() {
@@ -79,7 +90,7 @@ public class HdfsProducerConsumerIntegrationManualIT extends CamelTestSupport {
             String text = exchange.getIn().getBody(String.class);
             sent.remove(text);
         }
-        assertThat(sent.isEmpty(), is(true));
+        assertTrue(sent.isEmpty());
     }
 
     @Test
@@ -90,9 +101,9 @@ public class HdfsProducerConsumerIntegrationManualIT extends CamelTestSupport {
         FileSystem fs = FileSystem.get(p.toUri(), new Configuration());
         fs.mkdirs(p);
         for (int i = 1; i <= ITERATIONS; i++) {
-            FSDataOutputStream os = fs.create(new Path(p, String.format("file-%03d.txt", i)));
-            os.write(String.format("hello (%03d)\n", i).getBytes());
-            os.close();
+            try (FSDataOutputStream os = fs.create(new Path(p, String.format("file-%03d.txt", i)))) {
+                os.write(String.format("hello (%03d)\n", i).getBytes());
+            }
         }
 
         final Set<String> fileNames = new HashSet<>();
@@ -127,20 +138,16 @@ public class HdfsProducerConsumerIntegrationManualIT extends CamelTestSupport {
         latch.await(30, TimeUnit.SECONDS);
 
         resultEndpoint.assertIsSatisfied();
-        assertThat(fileNames.size(), equalTo(ITERATIONS));
+        assertEquals(fileNames.size(), ITERATIONS);
     }
 
     @Override
     @AfterEach
     public void tearDown() throws Exception {
         super.tearDown();
-
-        Thread.sleep(250);
-        Configuration conf = new Configuration();
-        Path dir = new Path("hdfs://localhost:9000/tmp/test");
-        FileSystem fs = FileSystem.get(dir.toUri(), conf);
-        fs.delete(dir, true);
-        fs.delete(new Path("hdfs://localhost:9000/tmp/test/multiple-consumers"), true);
+        if (cluster != null) {
+            cluster.shutdown();
+        }
     }
 
 }