You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/05 15:06:10 UTC
[camel-kafka-connector] branch master updated: Prevent the file
test from failing when the test file has been created but no content has
been written
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 61ba7a7 Prevent the file test from failing when the test file has been created but no content has been written
new 6456a82 Merge pull request #6 from orpiske/fix-camel-file-test
61ba7a7 is described below
commit 61ba7a736f94579fb161f0659a47c022ab51cb45
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Dec 5 15:09:12 2019 +0100
Prevent the file test from failing when the test file has been created but no content has been written
---
.../sink/file/CamelSinkFileITCase.java | 51 ++++++++++++++--------
1 file changed, 34 insertions(+), 17 deletions(-)
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java
index a6fb99b..f817dc4 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java
@@ -53,6 +53,7 @@ public class CamelSinkFileITCase {
private static final String SINK_DIR = CamelSinkFileITCase.class.getResource(".").getPath();
private static final String FILENAME = "test.txt";
+
@Rule
public KafkaContainer kafka = new KafkaContainer().withEmbeddedZookeeper();
@@ -66,9 +67,14 @@ public class CamelSinkFileITCase {
ContainerUtil.waitForInitialization(kafka);
LOG.info("Kafka bootstrap server running at address {}", kafka.getBootstrapServers());
- String url = "file://" + SINK_DIR + "?fileName=" + FILENAME;
+ String url = "file://" + SINK_DIR + "?fileName=" + FILENAME + "&doneFileName=${file:name}.done";
LOG.debug("Saving files to {}", url);
+ File doneFile = new File(SINK_DIR, FILENAME + ".done");
+ if (doneFile.exists()) {
+ doneFile.delete();
+ }
+
ConnectorPropertyFactory testProperties = new CamelFilePropertyFactory(1,
TestCommon.DEFAULT_TEST_TOPIC, url);
@@ -103,23 +109,14 @@ public class CamelSinkFileITCase {
LOG.debug("Created the consumer ... About to receive messages");
File sinkFile = new File(SINK_DIR, FILENAME);
- waitForFile(sinkFile);
+ File doneFile = new File(SINK_DIR, FILENAME + ".done");
- Assert.assertTrue(String.format("The file %s does not exist", sinkFile.getPath()), sinkFile.exists());
+ waitForFile(sinkFile, doneFile);
- BufferedReader reader = new BufferedReader(new FileReader(sinkFile));
+ Assert.assertTrue(String.format("The file %s does not exist", sinkFile.getPath()), sinkFile.exists());
- int i = 0;
- String line;
- do {
- line = reader.readLine();
- if (line != null) {
- Assert.assertEquals(String.format("Unexpected data: %s", line), "test", line);
- i++;
- }
- } while (line != null);
+ checkFileContents(sinkFile);
- Assert.assertEquals("Did not receive the same amount of messages that were sent", expect, i);
} catch (Exception e) {
LOG.error("HTTP test failed: {}", e.getMessage(), e);
fail(e.getMessage());
@@ -128,10 +125,30 @@ public class CamelSinkFileITCase {
}
}
- private void waitForFile(File sinkFile) throws IOException, InterruptedException {
+ private void checkFileContents(File sinkFile) throws IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(sinkFile));
+
+ int i = 0;
+ String line;
+ do {
+ line = reader.readLine();
+ if (line != null) {
+ Assert.assertEquals(String.format("Unexpected data: %s", line), "test", line);
+ i++;
+ }
+ } while (line != null);
+
+ Assert.assertEquals("Did not receive the same amount of messages that were sent", expect, i);
+ }
+
+ private void waitForFile(File sinkFile, File doneFile) throws IOException, InterruptedException {
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = sinkFile.getParentFile().toPath();
+ if (doneFile.exists()) {
+ return;
+ }
+
// We watch for both the file creation and truncation
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY);
@@ -156,7 +173,7 @@ public class CamelSinkFileITCase {
Path contextPath = (Path) context;
- if (contextPath.toString().equals(sinkFile.getName())) {
+ if (contextPath.toString().equals(doneFile.getName())) {
LOG.info("Sink file at the build path {} had a matching event of type: {}", sinkFile.getPath(),
event.kind());
@@ -168,6 +185,6 @@ public class CamelSinkFileITCase {
}
watchKey.reset();
retries--;
- } while (!sinkFile.exists() && retries > 0);
+ } while (!doneFile.exists() && retries > 0);
}
}