You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/06 11:48:04 UTC
[camel] 01/02: CAMEL-18792: camel-hdfs - Make integration tests runnable without a real cluster
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit d27a54c4685af3f9486c34ed972bd8b80cb577f8
Author: Kengo Seki <se...@apache.org>
AuthorDate: Sat Dec 3 14:16:18 2022 +0900
CAMEL-18792: camel-hdfs - Make integration tests runnable without a real cluster
---
components/camel-hdfs/pom.xml | 6 +
.../component/hdfs/integration/HdfsAppendIT.java | 62 ++--
.../integration/HdfsConsumerIntegrationIT.java | 312 +++++++++------------
...java => HdfsProducerConsumerIntegrationIT.java} | 43 +--
4 files changed, 196 insertions(+), 227 deletions(-)
diff --git a/components/camel-hdfs/pom.xml b/components/camel-hdfs/pom.xml
index 944f3456ecf..5732f2a2ea1 100644
--- a/components/camel-hdfs/pom.xml
+++ b/components/camel-hdfs/pom.xml
@@ -144,6 +144,12 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop3-version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendIT.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendIT.java
index 8d59650287e..2ece9214a94 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendIT.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendIT.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -44,6 +45,9 @@ public class HdfsAppendIT extends CamelTestSupport {
private static final int ITERATIONS = 10;
+ private MiniDFSCluster cluster;
+ private Configuration conf;
+
@Override
public boolean isUseRouteBuilder() {
return false;
@@ -54,25 +58,25 @@ public class HdfsAppendIT extends CamelTestSupport {
public void setUp() throws Exception {
super.setUp();
- Configuration conf = new Configuration();
+ conf = new Configuration();
if (SystemUtils.IS_OS_MAC) {
conf.addResource("hdfs-mac-test.xml");
} else {
conf.addResource("hdfs-test.xml");
}
+ conf.set("dfs.namenode.fs-limits.max-directory-items", "1048576");
+ cluster = new MiniDFSCluster.Builder(conf).nameNodePort(9000).numDataNodes(3).format(true).build();
+ cluster.waitActive();
+
String path = String.format("hdfs://%s:%d/tmp/test/test-camel-simple-write-file1", service.getHDFSHost(),
service.getPort());
-
Path file = new Path(path);
FileSystem fs = FileSystem.get(file.toUri(), conf);
- if (fs.exists(file)) {
- fs.delete(file, true);
- }
- FSDataOutputStream out = fs.create(file);
- for (int i = 0; i < 10; ++i) {
- out.write("PIPPO".getBytes("UTF-8"));
+ try (FSDataOutputStream out = fs.create(file)) {
+ for (int i = 0; i < 10; ++i) {
+ out.write("PIPPO".getBytes("UTF-8"));
+ }
}
- out.close();
}
@Test
@@ -91,21 +95,20 @@ public class HdfsAppendIT extends CamelTestSupport {
template.sendBody("direct:start1", "PIPPQ");
}
- Configuration conf = new Configuration();
String path = String.format("hdfs://%s:%d/tmp/test/test-camel-simple-write-file1", service.getHDFSHost(),
service.getPort());
Path file = new Path(path);
FileSystem fs = FileSystem.get(file.toUri(), conf);
- FSDataInputStream in = fs.open(file);
- byte[] buffer = new byte[5];
int ret = 0;
- for (int i = 0; i < 20; ++i) {
+ try (FSDataInputStream in = fs.open(file)) {
+ byte[] buffer = new byte[5];
+ for (int i = 0; i < 20; ++i) {
+ assertEquals(5, in.read(buffer));
+ LOG.info("> {}", new String(buffer));
+ }
ret = in.read(buffer);
- LOG.info("> {}", new String(buffer));
}
- ret = in.read(buffer);
assertEquals(-1, ret);
- in.close();
}
@Test
@@ -124,34 +127,29 @@ public class HdfsAppendIT extends CamelTestSupport {
template.sendBodyAndHeader("direct:start1", "HELLO", Exchange.FILE_NAME, "camel-hdfs.log");
}
- Configuration conf = new Configuration();
String path = String.format("hdfs://%s:%d/tmp/test-dynamic/camel-hdfs.log", service.getHDFSHost(),
service.getPort());
Path file = new Path(path);
FileSystem fs = FileSystem.get(file.toUri(), conf);
- FSDataInputStream in = fs.open(file);
- byte[] buffer = new byte[5];
- for (int i = 0; i < ITERATIONS; ++i) {
- assertEquals(5, in.read(buffer));
- LOG.info("> {}", new String(buffer));
+ int ret = 0;
+ try (FSDataInputStream in = fs.open(file)) {
+ byte[] buffer = new byte[5];
+ for (int i = 0; i < ITERATIONS; ++i) {
+ assertEquals(5, in.read(buffer));
+ LOG.info("> {}", new String(buffer));
+ }
+ ret = in.read(buffer);
}
- int ret = in.read(buffer);
assertEquals(-1, ret);
- in.close();
}
@Override
@AfterEach
public void tearDown() throws Exception {
super.tearDown();
-
- Thread.sleep(250);
- Configuration conf = new Configuration();
- Path dir = new Path(String.format("hdfs://%s:%d/tmp/test", service.getHDFSHost(), service.getPort()));
- FileSystem fs = FileSystem.get(dir.toUri(), conf);
- fs.delete(dir, true);
- dir = new Path(String.format("hdfs://%s:%d/tmp/test-dynamic", service.getHDFSHost(), service.getPort()));
- fs.delete(dir, true);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationIT.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationIT.java
index 2907957892c..52dea7acb22 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationIT.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsConsumerIntegrationIT.java
@@ -17,10 +17,7 @@
package org.apache.camel.component.hdfs.integration;
import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -37,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.ArrayFile;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
@@ -49,20 +47,21 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.Progressable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.apache.camel.test.junit5.TestSupport.deleteDirectory;
-import static org.apache.hadoop.io.SequenceFile.CompressionType;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
private static final int ITERATIONS = 200;
+ private MiniDFSCluster cluster;
+ private FileSystem fs;
+ private Configuration conf;
+ private Path dir;
+
@Override
public boolean isUseRouteBuilder() {
return false;
@@ -73,6 +72,15 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
public void setUp() throws Exception {
checkTest();
+ conf = new Configuration();
+ conf.set("dfs.namenode.fs-limits.max-directory-items", "1048576");
+ cluster = new MiniDFSCluster.Builder(conf).nameNodePort(9000).numDataNodes(3).format(true).build();
+ cluster.waitActive();
+
+ dir = new Path("hdfs://localhost:9000/tmp/test");
+ fs = FileSystem.get(dir.toUri(), conf);
+ fs.mkdirs(dir);
+
// must be able to get security configuration
try {
javax.security.auth.login.Configuration.getConfiguration();
@@ -80,28 +88,25 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
return;
}
- deleteDirectory("target/test");
super.setUp();
}
@Test
public void testSimpleConsumer() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(file.toUri(), conf);
- FSDataOutputStream out = fs.create(file);
- for (int i = 0; i < 1024; ++i) {
- out.write(("PIPPO" + i).getBytes("UTF-8"));
- out.flush();
+ final String file = "test-camel-normal-file";
+ try (FSDataOutputStream out = fs.create(new Path(dir, file))) {
+ for (int i = 0; i < 1024; ++i) {
+ out.write(("PIPPO" + i).getBytes("UTF-8"));
+ out.flush();
+ }
}
- out.close();
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(2);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&chunkSize=4096&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -112,13 +117,12 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testConcurrentConsumers() throws Exception {
- final File rootdir = CWD;
- final File dir = new File("target/test/multiple-consumers");
- dir.mkdirs();
+ final Path dir = new Path(this.dir, "multiple-consumers");
+ fs.mkdirs(dir);
for (int i = 1; i <= ITERATIONS; i++) {
- FileOutputStream fos = new FileOutputStream(new File(dir, String.format("file-%04d.txt", i)));
- fos.write(String.format("hello (%04d)\n", i).getBytes());
- fos.close();
+ try (FSDataOutputStream fos = fs.create(new Path(dir, String.format("file-%04d.txt", i)))) {
+ fos.write(String.format("hello (%04d)\n", i).getBytes());
+ }
}
final Set<String> fileNames = new HashSet<>();
@@ -134,17 +138,17 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs://" + rootdir.toURI()
- + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=100&initialDelay=0")
+ from(dir.toUri()
+ + "?pattern=*.txt&fileSystemType=HDFS&chunkSize=100&initialDelay=0")
.to("mock:result");
- from("hdfs://" + rootdir.toURI()
- + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=200&initialDelay=0")
+ from(dir.toUri()
+ + "?pattern=*.txt&fileSystemType=HDFS&chunkSize=200&initialDelay=0")
.to("mock:result");
- from("hdfs://" + rootdir.toURI()
- + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=300&initialDelay=0")
+ from(dir.toUri()
+ + "?pattern=*.txt&fileSystemType=HDFS&chunkSize=300&initialDelay=0")
.to("mock:result");
- from("hdfs://" + rootdir.toURI()
- + "/target/test/multiple-consumers?pattern=*.txt&fileSystemType=LOCAL&chunkSize=400&initialDelay=0")
+ from(dir.toUri()
+ + "pattern=*.txt&fileSystemType=HDFS&chunkSize=400&initialDelay=0")
.to("mock:result");
}
});
@@ -155,16 +159,13 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
latch.await(30, TimeUnit.SECONDS);
resultEndpoint.assertIsSatisfied();
- assertThat(fileNames.size(), equalTo(ITERATIONS));
+ assertEquals(fileNames.size(), ITERATIONS);
}
@Test
public void testSimpleConsumerWithEmptyFile() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(file.toUri(), conf);
- FSDataOutputStream out = fs.create(file);
- out.close();
+ final String file = "test-camel-normal-file";
+ fs.createNewFile(new Path(dir, file));
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
// TODO: See comment from Claus at ticket: https://issues.apache.org/jira/browse/CAMEL-8434
@@ -172,7 +173,7 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&chunkSize=4096&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -181,55 +182,54 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
Thread.sleep(2000);
resultEndpoint.assertIsSatisfied();
- assertThat(
+ assertEquals(
resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length,
- equalTo(0));
+ 0);
}
@Test
public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath());
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(file.toUri(), conf);
- FSDataOutputStream out = fs.create(file);
- // size = 5 times chunk size = 210 bytes
- for (int i = 0; i < 42; ++i) {
- out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 });
- out.flush();
+ final String file = "test-camel-normal-file";
+ try (FSDataOutputStream out = fs.create(new Path(dir, file))) {
+ // size = 5 times chunk size = 210 bytes
+ for (int i = 0; i < 42; ++i) {
+ out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 });
+ out.flush();
+ }
}
- out.close();
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(5);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=42&initialDelay=0").to("mock:result");
+ fromF("%s?pattern=%s&fileSystemType=HDFS&chunkSize=42&initialDelay=0", dir.toUri(), file)
+ .to("mock:result");
}
});
context.start();
resultEndpoint.assertIsSatisfied();
- assertThat(
+ assertEquals(
resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length,
- equalTo(42));
+ 42);
}
@Test
public void testSimpleConsumerWithEmptySequenceFile() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-sequence-file").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
- writer.sync();
- writer.close();
+ final String file = "test-camel-sequence-file";
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, BooleanWritable.class)) {
+ writer.sync();
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(0);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri()
- + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0").to("mock:result");
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0", dir.toUri(),
+ file)
+ .to("mock:result");
}
});
context.start();
@@ -239,23 +239,18 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadWithReadSuffix() throws Exception {
- String[] beforeFiles = new File("target/test").list();
- int before = beforeFiles != null ? beforeFiles.length : 0;
-
- final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
+ final String file = "test-camel-boolean";
NullWritable keyWritable = NullWritable.get();
- BooleanWritable valueWritable = new BooleanWritable();
- valueWritable.set(true);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ BooleanWritable valueWritable = new BooleanWritable(true);
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, BooleanWritable.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.getParent().toUri()
- + "?scheduler=#myScheduler&pattern=*&fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled")
+ from(dir.toUri()
+ + "?scheduler=#myScheduler&pattern=*&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled")
.to("mock:result");
}
});
@@ -272,28 +267,23 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
scheduler.getScheduledExecutorService().shutdown();
scheduler.getScheduledExecutorService().awaitTermination(5000, TimeUnit.MILLISECONDS);
- Set<String> files = new HashSet<>(Arrays.asList(new File("target/test").list()));
- // there may be some leftover files before, so test that we only added 2 new files
- assertThat(files.size() - before, equalTo(2));
- assertTrue(files.remove("test-camel-boolean.handled"));
- assertTrue(files.remove(".test-camel-boolean.handled.crc"));
+ assertEquals(fs.listStatus(dir).length, 1);
+ assertTrue(fs.delete(new Path(dir, file + ".handled")));
}
@Test
public void testReadBoolean() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class);
+ final String file = "test-camel-boolean";
NullWritable keyWritable = NullWritable.get();
- BooleanWritable valueWritable = new BooleanWritable();
- valueWritable.set(true);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ BooleanWritable valueWritable = new BooleanWritable(true);
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, BooleanWritable.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -306,16 +296,13 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadByte() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-byte").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, ByteWritable.class);
+ final String file = "test-camel-byte";
NullWritable keyWritable = NullWritable.get();
- ByteWritable valueWritable = new ByteWritable();
- byte value = 3;
- valueWritable.set(value);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ ByteWritable valueWritable = new ByteWritable((byte) 3);
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, ByteWritable.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
@@ -323,7 +310,7 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -334,23 +321,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadFloat() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-float").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, FloatWritable.class);
+ final String file = "test-camel-float";
NullWritable keyWritable = NullWritable.get();
- FloatWritable valueWritable = new FloatWritable();
- float value = 3.1415926535f;
- valueWritable.set(value);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ FloatWritable valueWritable = new FloatWritable(3.1415926535f);
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, FloatWritable.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -361,23 +345,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadDouble() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-double").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, DoubleWritable.class);
+ final String file = "test-camel-double";
NullWritable keyWritable = NullWritable.get();
- DoubleWritable valueWritable = new DoubleWritable();
- double value = 3.1415926535;
- valueWritable.set(value);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ DoubleWritable valueWritable = new DoubleWritable(3.1415926535);
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, DoubleWritable.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -388,23 +369,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadInt() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-int").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, IntWritable.class);
+ final String file = "test-camel-int";
NullWritable keyWritable = NullWritable.get();
- IntWritable valueWritable = new IntWritable();
- int value = 314159265;
- valueWritable.set(value);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ IntWritable valueWritable = new IntWritable(314159265);
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, IntWritable.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -415,23 +393,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadLong() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-long").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, LongWritable.class);
+ final String file = "test-camel-long";
NullWritable keyWritable = NullWritable.get();
- LongWritable valueWritable = new LongWritable();
- long value = 31415926535L;
- valueWritable.set(value);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ LongWritable valueWritable = new LongWritable(31415926535L);
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, LongWritable.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -442,23 +417,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadBytes() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-bytes").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BytesWritable.class);
+ final String file = "test-camel-bytes";
NullWritable keyWritable = NullWritable.get();
- BytesWritable valueWritable = new BytesWritable();
- String value = "CIAO!";
- valueWritable.set(value.getBytes(), 0, value.getBytes().length);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ BytesWritable valueWritable = new BytesWritable("CIAO!".getBytes());
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, BytesWritable.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -469,23 +441,20 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadString() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-string").getAbsolutePath());
- Configuration conf = new Configuration();
- SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, Text.class);
+ final String file = "test-camel-string";
NullWritable keyWritable = NullWritable.get();
- Text valueWritable = new Text();
- String value = "CIAO!";
- valueWritable.set(value);
- writer.append(keyWritable, valueWritable);
- writer.sync();
- writer.close();
+ Text valueWritable = new Text("CIAO!");
+ try (SequenceFile.Writer writer = createWriter(conf, new Path(dir, file), NullWritable.class, Text.class)) {
+ writer.append(keyWritable, valueWritable);
+ writer.sync();
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=SEQUENCE_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -496,27 +465,18 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@Test
public void testReadStringArrayFile() throws Exception {
- final Path file = new Path(new File("target/test/test-camel-string").getAbsolutePath());
- Configuration conf = new Configuration();
- FileSystem fs1 = FileSystem.get(file.toUri(), conf);
- ArrayFile.Writer writer = new ArrayFile.Writer(
- conf, fs1, "target/test/test-camel-string1", Text.class, CompressionType.NONE, new Progressable() {
- @Override
- public void progress() {
- }
- });
- Text valueWritable = new Text();
- String value = "CIAO!";
- valueWritable.set(value);
- writer.append(valueWritable);
- writer.close();
+ final String file = "test-camel-string";
+ Text valueWritable = new Text("CIAO!");
+ try (ArrayFile.Writer writer = new ArrayFile.Writer(conf, fs, dir.toUri() + "/" + file, Text.class)) {
+ writer.append(valueWritable);
+ }
MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class);
resultEndpoint.expectedMessageCount(1);
context.addRoutes(new RouteBuilder() {
public void configure() {
- from("hdfs:localhost/" + file.getParent().toUri() + "?fileSystemType=LOCAL&fileType=ARRAY_FILE&initialDelay=0")
+ fromF("%s?pattern=%s&fileSystemType=HDFS&fileType=ARRAY_FILE&initialDelay=0", dir.toUri(), file)
.to("mock:result");
}
});
@@ -529,11 +489,9 @@ public class HdfsConsumerIntegrationIT extends HdfsTestSupport {
@AfterEach
public void tearDown() throws Exception {
super.tearDown();
- Thread.sleep(100);
- Configuration conf = new Configuration();
- Path dir = new Path("target/test");
- FileSystem fs = FileSystem.get(dir.toUri(), conf);
- fs.delete(dir, true);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
private Writer createWriter(
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationManualIT.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationIT.java
similarity index 83%
rename from components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationManualIT.java
rename to components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationIT.java
index b2351938838..bce92eb8021 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationManualIT.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationIT.java
@@ -31,23 +31,34 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-@Disabled("Must run manual")
-public class HdfsProducerConsumerIntegrationManualIT extends CamelTestSupport {
+public class HdfsProducerConsumerIntegrationIT extends CamelTestSupport {
private static final int ITERATIONS = 400;
+ private MiniDFSCluster cluster;
+
@Override
public boolean isUseRouteBuilder() {
return false;
}
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ Configuration conf = new Configuration();
+ conf.set("dfs.namenode.fs-limits.max-directory-items", "1048576");
+ cluster = new MiniDFSCluster.Builder(conf).nameNodePort(9000).numDataNodes(3).format(true).build();
+ cluster.waitActive();
+ }
+
@Test
public void testSimpleSplitWriteRead() throws Exception {
context.addRoutes(new RouteBuilder() {
@@ -79,7 +90,7 @@ public class HdfsProducerConsumerIntegrationManualIT extends CamelTestSupport {
String text = exchange.getIn().getBody(String.class);
sent.remove(text);
}
- assertThat(sent.isEmpty(), is(true));
+ assertTrue(sent.isEmpty());
}
@Test
@@ -90,9 +101,9 @@ public class HdfsProducerConsumerIntegrationManualIT extends CamelTestSupport {
FileSystem fs = FileSystem.get(p.toUri(), new Configuration());
fs.mkdirs(p);
for (int i = 1; i <= ITERATIONS; i++) {
- FSDataOutputStream os = fs.create(new Path(p, String.format("file-%03d.txt", i)));
- os.write(String.format("hello (%03d)\n", i).getBytes());
- os.close();
+ try (FSDataOutputStream os = fs.create(new Path(p, String.format("file-%03d.txt", i)))) {
+ os.write(String.format("hello (%03d)\n", i).getBytes());
+ }
}
final Set<String> fileNames = new HashSet<>();
@@ -127,20 +138,16 @@ public class HdfsProducerConsumerIntegrationManualIT extends CamelTestSupport {
latch.await(30, TimeUnit.SECONDS);
resultEndpoint.assertIsSatisfied();
- assertThat(fileNames.size(), equalTo(ITERATIONS));
+ assertEquals(fileNames.size(), ITERATIONS);
}
@Override
@AfterEach
public void tearDown() throws Exception {
super.tearDown();
-
- Thread.sleep(250);
- Configuration conf = new Configuration();
- Path dir = new Path("hdfs://localhost:9000/tmp/test");
- FileSystem fs = FileSystem.get(dir.toUri(), conf);
- fs.delete(dir, true);
- fs.delete(new Path("hdfs://localhost:9000/tmp/test/multiple-consumers"), true);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}