You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/11 11:25:24 UTC

[camel] branch master updated: CAMEL-12112: Added unit test thanks to Bhuvan Gupta for the test. Validate the readLock option is configured with a valid name if in use. Fixed missing default option in the docs. Increased the in-progress cache to 50000 as 1000 was likely to small anyway.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new a8271e5  CAMEL-12112: Added unit test thanks to Bhuvan Gupta for the test. Validate the readLock option is configured with a valid name if in use. Fixed missing default option in the docs. Increased the in-progress cache to 50000 as 1000 was likely to small anyway.
a8271e5 is described below

commit a8271e513fd8a2783e2ed7dd13790ebaa2ee8065
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 11 12:21:07 2018 +0100

    CAMEL-12112: Added unit test thanks to Bhuvan Gupta for the test. Validate the readLock option is configured with a valid name if in use. Fixed missing default option in the docs. Increased the in-progress cache to 50000 as 1000 was likely to small anyway.
---
 camel-core/src/main/docs/file-component.adoc       |   2 +-
 .../apache/camel/component/file/FileEndpoint.java  |  12 ++-
 .../camel/component/file/GenericFileEndpoint.java  |   5 +-
 .../FileConsumerThreadsInProgressIssueTest.java    | 112 +++++++++++++++++++++
 4 files changed, 127 insertions(+), 4 deletions(-)

diff --git a/camel-core/src/main/docs/file-component.adoc b/camel-core/src/main/docs/file-component.adoc
index cfc013f..9fbd3ed 100644
--- a/camel-core/src/main/docs/file-component.adoc
+++ b/camel-core/src/main/docs/file-component.adoc
@@ -129,7 +129,7 @@ with the following path and query parameters:
 | *minDepth* (filter) | The minimum depth to start processing when recursively processing a directory. Using minDepth=1 means the base directory. Using minDepth=2 means the first sub directory. |  | int
 | *move* (filter) | Expression (such as Simple Language) used to dynamically set the filename when moving it after processing. To move files into a .done subdirectory just enter .done. |  | String
 | *exclusiveReadLockStrategy* (lock) | Pluggable read-lock as a org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy implementation. |  | GenericFileExclusive ReadLockStrategy<T>
-| *readLock* (lock) | Used by consumer to only poll the files if it has exclusive read-lock on the file (i.e. the file is not in-progress or being written). Camel will wait until the file lock is granted. This option provides the build in strategies: none - No read lock is in use markerFile - Camel creates a marker file (fileName.camelLock) and then holds a lock on it. This option is not available for the FTP component changed - Changed is using file length/modification timestamp to dete [...]
+| *readLock* (lock) | Used by consumer to only poll the files if it has exclusive read-lock on the file (i.e. the file is not in-progress or being written). Camel will wait until the file lock is granted. This option provides the build in strategies: none - No read lock is in use markerFile - Camel creates a marker file (fileName.camelLock) and then holds a lock on it. This option is not available for the FTP component changed - Changed is using file length/modification timestamp to dete [...]
 | *readLockCheckInterval* (lock) | Interval in millis for the read-lock if supported by the read lock. This interval is used for sleeping between attempts to acquire the read lock. For example when using the changed read lock you can set a higher interval period to cater for slow writes. The default of 1 sec. may be too fast if the producer is very slow writing the file. Notice: For FTP the default readLockCheckInterval is 5000. The readLockTimeout value must be higher than readLockCheck [...]
 | *readLockDeleteOrphanLock Files* (lock) | Whether or not read lock with marker files should upon startup delete any orphan read lock files which may have been left on the file system if Camel was not properly shutdown (such as a JVM crash). If turning this option to false then any orphaned lock file will cause Camel to not attempt to pickup that file this could also be due another node is concurrently reading files from the same shared directory. | true | boolean
 | *readLockLoggingLevel* (lock) | Logging level used when a read lock could not be acquired. By default a WARN is logged. You can change this level for example to OFF to not have any logging. This option is only applicable for readLock of types: changed fileLock idempotent idempotent-changed idempotent-rename rename. | DEBUG | LoggingLevel
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
index 9bfdc41..6f22330 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.nio.file.Files;
 import java.nio.file.attribute.PosixFilePermission;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -28,7 +29,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.EventDrivenPollingConsumer;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
@@ -113,6 +113,16 @@ public class FileEndpoint extends GenericFileEndpoint<File> {
             idempotentRepository = MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IDEMPOTENT_CACHE_SIZE);
         }
 
+        if (ObjectHelper.isNotEmpty(getReadLock())) {
+            // check if its a valid
+            String valid = "none,markerFile,fileLock,rename,changed,idempotent,idempotent-changed,idempotent-rename";
+            String[] arr = valid.split(",");
+            boolean matched = Arrays.stream(arr).anyMatch(n -> n.equals(getReadLock()));
+            if (!matched) {
+                throw new IllegalArgumentException("ReadLock invalid: " + getReadLock() + ", must be one of: " + valid);
+            }
+        }
+
         // set max messages per poll
         result.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         result.setEagerLimitMaxMessagesPerPoll(isEagerMaxMessagesPerPoll());
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
index 22e41df..1e90033 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileEndpoint.java
@@ -58,6 +58,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
 
     protected static final String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory";
     protected static final int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
+    protected static final int DEFAULT_IN_PROGRESS_CACHE_SIZE = 50000;
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -98,7 +99,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
     @UriParam(label = "consumer,advanced")
     protected GenericFileProcessStrategy<T> processStrategy;
     @UriParam(label = "consumer,advanced")
-    protected IdempotentRepository<String> inProgressRepository = new MemoryIdempotentRepository();
+    protected IdempotentRepository<String> inProgressRepository = MemoryIdempotentRepository.memoryIdempotentRepository(DEFAULT_IN_PROGRESS_CACHE_SIZE);
     @UriParam(label = "consumer,advanced")
     protected String localWorkDirectory;
     @UriParam(label = "consumer,advanced")
@@ -158,7 +159,7 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
     protected Comparator<Exchange> sortBy;
     @UriParam(label = "consumer,sort")
     protected boolean shuffle;
-    @UriParam(label = "consumer,lock", enums = "none,markerFile,fileLock,rename,changed,idempotent,idempotent-changed,idempotent-rename")
+    @UriParam(label = "consumer,lock", defaultValue = "none", enums = "none,markerFile,fileLock,rename,changed,idempotent,idempotent-changed,idempotent-rename")
     protected String readLock = "none";
     @UriParam(label = "consumer,lock", defaultValue = "1000")
     protected long readLockCheckInterval = 1000;
diff --git a/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerThreadsInProgressIssueTest.java b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerThreadsInProgressIssueTest.java
new file mode 100644
index 0000000..be50c61
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerThreadsInProgressIssueTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class FileConsumerThreadsInProgressIssueTest extends ContextTestSupport {
+
+    private final Map<String, Integer> duplicate = new HashMap<>();
+    private final SampleProcessor processor = new SampleProcessor(duplicate);
+
+    private int number = 2000;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/manyfiles?sortBy=file:name&delay=10&synchronous=false").routeId("myRoute").noAutoStartup()
+                    .threads(1, 10).maxQueueSize(0)
+                    .convertBodyTo(String.class)
+                    .process(processor)
+                    .to("log:done", "mock:done");
+            }
+        };
+    }
+
+    public void testFileConsumerThreadsInProgressIssue() throws Exception {
+        // give longer timeout for stopping
+        context.getShutdownStrategy().setTimeout(180);
+
+        MockEndpoint mock = getMockEndpoint("mock:done");
+        mock.expectedMessageCount(number);
+        mock.expectsNoDuplicates(body());
+
+        createManyFiles(number);
+
+        context.startRoute("myRoute");
+
+        mock.setResultWaitTime(180 * 1000);
+        mock.assertIsSatisfied();
+
+        context.stop();
+
+        int found = 0;
+        log.info("=====================");
+        log.info("Printing duplicates");
+        for (Map.Entry<String, Integer> ent : duplicate.entrySet()) {
+            Integer count = ent.getValue();
+            if (count > 1) {
+                found++;
+                log.info(ent.getKey() + " :: " + count);
+            }
+        }
+
+        assertEquals("Should not contain duplicates", 0, found);
+    }
+
+    private static void createManyFiles(int number) throws Exception {
+        deleteDirectory("target/manyfiles");
+        createDirectory("target/manyfiles");
+        for (int i = 0; i < number; i++) {
+            String pad = String.format("%04d", i);
+            PrintWriter writer = new PrintWriter("target/manyfiles/newFile-" + pad, "UTF-8");
+            writer.println(pad);
+            writer.close();
+        }
+    }
+
+    private class SampleProcessor implements Processor {
+        private Map<String, Integer> duplicate;
+
+        public SampleProcessor(Map<String, Integer> duplicate) {
+            this.duplicate = duplicate;
+        }
+
+        public void process(Exchange exchange) throws Exception {
+            Integer integer = duplicate.get(exchange.toString());
+            if (integer == null) {
+                duplicate.put(exchange.toString(), 1);
+            } else {
+                integer++;
+                duplicate.put(exchange.toString(), integer);
+            }
+            log.info("Process called for-" + exchange);
+            Thread.sleep(20);
+        }
+
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].