You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/12/05 15:06:10 UTC

[camel-kafka-connector] branch master updated: Prevent the file test from failing when the test file has been created but no content has been written

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 61ba7a7  Prevent the file test from failing when the test file has been created but no content has been written
     new 6456a82  Merge pull request #6 from orpiske/fix-camel-file-test
61ba7a7 is described below

commit 61ba7a736f94579fb161f0659a47c022ab51cb45
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Thu Dec 5 15:09:12 2019 +0100

    Prevent the file test from failing when the test file has been created but no content has been written
---
 .../sink/file/CamelSinkFileITCase.java             | 51 ++++++++++++++--------
 1 file changed, 34 insertions(+), 17 deletions(-)

diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java
index a6fb99b..f817dc4 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/file/CamelSinkFileITCase.java
@@ -53,6 +53,7 @@ public class CamelSinkFileITCase {
     private static final String SINK_DIR = CamelSinkFileITCase.class.getResource(".").getPath();
     private static final String FILENAME = "test.txt";
 
+
     @Rule
     public KafkaContainer kafka = new KafkaContainer().withEmbeddedZookeeper();
 
@@ -66,9 +67,14 @@ public class CamelSinkFileITCase {
         ContainerUtil.waitForInitialization(kafka);
         LOG.info("Kafka bootstrap server running at address {}", kafka.getBootstrapServers());
 
-        String url = "file://" + SINK_DIR + "?fileName=" + FILENAME;
+        String url = "file://" + SINK_DIR + "?fileName=" + FILENAME + "&doneFileName=${file:name}.done";
         LOG.debug("Saving files to {}", url);
 
+        File doneFile = new File(SINK_DIR, FILENAME + ".done");
+        if (doneFile.exists()) {
+            doneFile.delete();
+        }
+
         ConnectorPropertyFactory testProperties = new CamelFilePropertyFactory(1,
                 TestCommon.DEFAULT_TEST_TOPIC, url);
 
@@ -103,23 +109,14 @@ public class CamelSinkFileITCase {
             LOG.debug("Created the consumer ... About to receive messages");
 
             File sinkFile = new File(SINK_DIR, FILENAME);
-            waitForFile(sinkFile);
+            File doneFile = new File(SINK_DIR, FILENAME + ".done");
 
-            Assert.assertTrue(String.format("The file %s does not exist", sinkFile.getPath()), sinkFile.exists());
+            waitForFile(sinkFile, doneFile);
 
-            BufferedReader reader = new BufferedReader(new FileReader(sinkFile));
+            Assert.assertTrue(String.format("The file %s does not exist", sinkFile.getPath()), sinkFile.exists());
 
-            int i = 0;
-            String line;
-            do {
-                line = reader.readLine();
-                if (line != null) {
-                    Assert.assertEquals(String.format("Unexpected data: %s", line), "test", line);
-                    i++;
-                }
-            } while (line != null);
+            checkFileContents(sinkFile);
 
-            Assert.assertEquals("Did not receive the same amount of messages that were sent", expect, i);
         } catch (Exception e) {
             LOG.error("HTTP test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
@@ -128,10 +125,30 @@ public class CamelSinkFileITCase {
         }
     }
 
-    private void waitForFile(File sinkFile) throws IOException, InterruptedException {
+    private void checkFileContents(File sinkFile) throws IOException {
+        BufferedReader reader = new BufferedReader(new FileReader(sinkFile));
+
+        int i = 0;
+        String line;
+        do {
+            line = reader.readLine();
+            if (line != null) {
+                Assert.assertEquals(String.format("Unexpected data: %s", line), "test", line);
+                i++;
+            }
+        } while (line != null);
+
+        Assert.assertEquals("Did not receive the same amount of messages that were sent", expect, i);
+    }
+
+    private void waitForFile(File sinkFile, File doneFile) throws IOException, InterruptedException {
         WatchService watchService = FileSystems.getDefault().newWatchService();
         Path path = sinkFile.getParentFile().toPath();
 
+        if (doneFile.exists()) {
+            return;
+        }
+
         // We watch for both the file creation and truncation
         path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY);
 
@@ -156,7 +173,7 @@ public class CamelSinkFileITCase {
 
                 Path contextPath = (Path) context;
 
-                if (contextPath.toString().equals(sinkFile.getName())) {
+                if (contextPath.toString().equals(doneFile.getName())) {
                     LOG.info("Sink file at the build path {} had a matching event of type: {}", sinkFile.getPath(),
                             event.kind());
 
@@ -168,6 +185,6 @@ public class CamelSinkFileITCase {
             }
             watchKey.reset();
             retries--;
-        } while (!sinkFile.exists() && retries > 0);
+        } while (!doneFile.exists() && retries > 0);
     }
 }