You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/11/27 07:49:40 UTC

[camel] 01/03: CAMEL-12020: Added polling consumer for file/ftp components that poll on-demand and do not use a background scheduler.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 21807defcfcbc40feafd80de68c5e0601a893ff8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Nov 26 19:37:26 2017 +0100

    CAMEL-12020: Added polling consumer for file/ftp components that poll on-demand and do not use a background scheduler.
---
 .../apache/camel/component/file/FileEndpoint.java  |  19 ++
 .../camel/component/file/GenericFileConsumer.java  |  19 +-
 .../component/file/GenericFilePollingConsumer.java | 229 +++++++++++++++++++++
 .../camel/impl/EventDrivenPollingConsumer.java     |  10 +-
 .../camel/component/test/TestFileSplitTest.java    |   2 +-
 .../camel/impl/ConsumerCacheZeroCapacityTest.java  |   2 +-
 .../enricher/PollEnrichBridgeErrorHandlerTest.java |   3 +-
 .../camel/component/file/remote/FtpEndpoint.java   |   2 +-
 .../component/file/remote/RemoteFileEndpoint.java  |  16 ++
 .../camel/component/file/remote/SftpEndpoint.java  |   1 -
 ...llEnrichConsumeWithDisconnectAndDeleteTest.java |  77 +++++++
 11 files changed, 373 insertions(+), 7 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
index 4b61d6c..9bfdc41 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
@@ -26,7 +26,9 @@ import java.util.Set;
 import org.apache.camel.Component;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.EventDrivenPollingConsumer;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
@@ -119,6 +121,23 @@ public class FileEndpoint extends GenericFileEndpoint<File> {
         return result;
     }
 
+    @Override
+    public PollingConsumer createPollingConsumer() throws Exception {
+        ObjectHelper.notNull(operations, "operations");
+        ObjectHelper.notNull(file, "file");
+
+        if (log.isDebugEnabled()) {
+            log.debug("Creating GenericFilePollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
+                getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout());
+        }
+        GenericFilePollingConsumer result = new GenericFilePollingConsumer(this);
+        // should not call configurePollingConsumer when its GenericFilePollingConsumer
+        result.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+        result.setBlockTimeout(getPollingConsumerBlockTimeout());
+
+        return result;
+    }
+
     public GenericFileProducer<File> createProducer() throws Exception {
         ObjectHelper.notNull(operations, "operations");
 
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index b6d7d67..619bf6b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -93,7 +93,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
     /**
      * Poll for files
      */
-    protected int poll() throws Exception {
+    public int poll() throws Exception {
         // must prepare on startup the very first time
         if (!prepareOnStartup) {
             // prepare on startup
@@ -720,4 +720,21 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
         prepareOnStartup = false;
         super.doStop();
     }
+
+    @Override
+    public void onInit() throws Exception {
+        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+    }
+
+    @Override
+    public long beforePoll(long timeout) throws Exception {
+        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+        return timeout;
+    }
+
+    @Override
+    public void afterPoll() throws Exception {
+        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+    }
+
 }
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
new file mode 100644
index 0000000..09f6bcf
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.EventDrivenPollingConsumer;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.spi.PollingConsumerPollStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericFilePollingConsumer extends EventDrivenPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GenericFilePollingConsumer.class);
+
+    private final long delay;
+
+    public GenericFilePollingConsumer(GenericFileEndpoint endpoint) throws Exception {
+        super(endpoint);
+        this.delay = endpoint.getDelay();
+    }
+
+    @Override
+    protected Consumer createConsumer() throws Exception {
+        // lets add ourselves as a consumer
+        GenericFileConsumer consumer = (GenericFileConsumer) super.createConsumer();
+        // do not start scheduler as we poll manually
+        consumer.setStartScheduler(false);
+        consumer.setMaxMessagesPerPoll(1);
+        // we only want to poll once so disconnect by default
+        return consumer;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        // ensure consumer is started
+        ServiceHelper.startService(getConsumer());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        super.doShutdown();
+    }
+
+    @Override
+    protected GenericFileConsumer getConsumer() {
+        return (GenericFileConsumer) super.getConsumer();
+    }
+
+    @Override
+    public Exchange receiveNoWait() {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("receiveNoWait polling file: {}", getConsumer().getEndpoint());
+        }
+        int polled = doReceive(0);
+        if (polled > 0) {
+            return super.receive(0);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Exchange receive() {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("receive polling file: {}", getConsumer().getEndpoint());
+        }
+        int polled = doReceive(Long.MAX_VALUE);
+        if (polled > 0) {
+            return super.receive();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Exchange receive(long timeout) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("receive({}) polling file: {}", timeout, getConsumer().getEndpoint());
+        }
+        int polled = doReceive(timeout);
+        if (polled > 0) {
+            return super.receive(timeout);
+        } else {
+            return null;
+        }
+    }
+
+    protected int doReceive(long timeout) {
+        int retryCounter = -1;
+        boolean done = false;
+        Throwable cause = null;
+        int polledMessages = 0;
+        PollingConsumerPollStrategy pollStrategy = getConsumer().getPollStrategy();
+        boolean sendEmptyMessageWhenIdle = getConsumer() instanceof ScheduledBatchPollingConsumer && getConsumer().isSendEmptyMessageWhenIdle();
+        StopWatch watch = new StopWatch();
+
+        while (!done) {
+            try {
+                cause = null;
+                // eager assume we are done
+                done = true;
+                if (isRunAllowed()) {
+
+                    if (retryCounter == -1) {
+                        LOG.trace("Starting to poll: {}", this.getEndpoint());
+                    } else {
+                        LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
+                    }
+
+                    // mark we are polling which should also include the begin/poll/commit
+                    boolean begin = pollStrategy.begin(this, getEndpoint());
+                    if (begin) {
+                        retryCounter++;
+                        polledMessages = getConsumer().poll();
+                        LOG.trace("Polled {} messages", polledMessages);
+
+                        if (polledMessages == 0 && sendEmptyMessageWhenIdle) {
+                            // send an "empty" exchange
+                            processEmptyMessage();
+                        } else if (polledMessages == 0 && timeout > 0) {
+                            // if we did not poll a file and we are using timeout then try to poll again
+                            done = false;
+                        }
+
+                        pollStrategy.commit(this, getEndpoint(), polledMessages);
+                    } else {
+                        LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+                    }
+                }
+
+                LOG.trace("Finished polling: {}", this.getEndpoint());
+            } catch (Exception e) {
+                try {
+                    boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
+                    if (retry) {
+                        // do not set cause as we retry
+                        done = false;
+                    } else {
+                        cause = e;
+                        done = true;
+                    }
+                } catch (Throwable t) {
+                    cause = t;
+                    done = true;
+                }
+            } catch (Throwable t) {
+                cause = t;
+                done = true;
+            }
+
+            if (!done && timeout > 0) {
+                // prepare for next attempt until we hit timeout
+                long left = timeout - watch.taken();
+                long min = Math.min(left, delay);
+                if (min > 0) {
+                    try {
+                        // sleep for next pool
+                        sleep(min);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                } else {
+                    // timeout hit
+                    done = true;
+                }
+            }
+        }
+
+        if (cause != null) {
+            throw ObjectHelper.wrapRuntimeCamelException(cause);
+        }
+
+        return polledMessages;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Object name = exchange.getIn().getHeader(Exchange.FILE_NAME);
+        if (name != null) {
+            LOG.debug("Received file: {}", name);
+        }
+        super.process(exchange);
+    }
+
+    /**
+     * No messages to poll so send an empty message instead.
+     *
+     * @throws Exception is thrown if error processing the empty message.
+     */
+    protected void processEmptyMessage() throws Exception {
+        Exchange exchange = getEndpoint().createExchange();
+        log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint());
+        process(exchange);
+    }
+
+    private static void sleep(long delay) throws InterruptedException {
+        if (delay <= 0) {
+            return;
+        }
+        LOG.trace("Sleeping for: {} millis", delay);
+        Thread.sleep(delay);
+    }
+
+}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index df00e0d..066abf4 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -211,9 +211,17 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
         }
     }
 
+    protected Consumer getConsumer() {
+        return consumer;
+    }
+
+    protected Consumer createConsumer() throws Exception {
+        return getEndpoint().createConsumer(this);
+    }
+
     protected void doStart() throws Exception {
         // lets add ourselves as a consumer
-        consumer = getEndpoint().createConsumer(this);
+        consumer = createConsumer();
 
         // if the consumer has a polling strategy then invoke that
         if (consumer instanceof PollingConsumerPollingStrategy) {
diff --git a/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java b/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
index 648e571..1732b0f 100644
--- a/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
@@ -41,7 +41,7 @@ public class TestFileSplitTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                        .to("test:file:target/testme?noop=true&split=true");
+                        .to("test:file:target/testme?noop=true&split=true&timeout=1000");
             }
         });
         context.start();
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
index 30f38a8..31cd3c1 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
@@ -45,7 +45,7 @@ public class ConsumerCacheZeroCapacityTest extends ContextTestSupport {
         consumer.receive(50);
 
         boolean found = Thread.getAllStackTraces().keySet().stream().anyMatch(t -> t.getName().contains("target/foo"));
-        assertTrue("Should find file consumer thread", found);
+        assertFalse("Should not find file consumer thread", found);
 
         cache.releasePollingConsumer(endpoint, consumer);
 
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
index 4d6d287..3b417c5 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
@@ -50,7 +50,8 @@ public class PollEnrichBridgeErrorHandlerTest extends ContextTestSupport {
 
         Exception caught = getMockEndpoint("mock:dead").getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
         assertNotNull(caught);
-        assertEquals("Something went wrong", caught.getMessage());
+        assertTrue(caught.getMessage().startsWith("Error during poll"));
+        assertEquals("Something went wrong", caught.getCause().getCause().getMessage());
     }
 
     @Override
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
index 45406f8..846500e 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
@@ -83,7 +83,7 @@ public class FtpEndpoint<T extends FTPFile> extends RemoteFileEndpoint<FTPFile>
             throw new FailedToCreateProducerException(this, e);
         }
     }
-    
+
     public RemoteFileOperations<FTPFile> createRemoteFileOperations() throws Exception {
         // configure ftp client
         FTPClient client = ftpClient;
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
index 884678a..a037afd 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
@@ -19,10 +19,12 @@ package org.apache.camel.component.file.remote;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.camel.component.file.GenericFileEndpoint;
 import org.apache.camel.component.file.GenericFileExist;
+import org.apache.camel.component.file.GenericFilePollingConsumer;
 import org.apache.camel.component.file.GenericFileProducer;
 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
 import org.apache.camel.spi.UriParam;
@@ -128,6 +130,20 @@ public abstract class RemoteFileEndpoint<T> extends GenericFileEndpoint<T> {
         return consumer;
     }
 
+    @Override
+    public PollingConsumer createPollingConsumer() throws Exception {
+        if (log.isDebugEnabled()) {
+            log.debug("Creating GenericFilePollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
+                getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout());
+        }
+        GenericFilePollingConsumer result = new GenericFilePollingConsumer(this);
+        // should not call configurePollingConsumer when its GenericFilePollingConsumer
+        result.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+        result.setBlockTimeout(getPollingConsumerBlockTimeout());
+
+        return result;
+    }
+
     /**
      * Validates this endpoint if its configured properly.
      *
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
index 8bfeb4f..42341bc 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.file.remote;
 
-import com.jcraft.jsch.ChannelSftp;
 import com.jcraft.jsch.Proxy;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFileConfiguration;
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java
new file mode 100644
index 0000000..258cb66
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class FtpPollEnrichConsumeWithDisconnectAndDeleteTest extends FtpServerTestSupport {
+
+    @Test
+    public void testFtpSimpleConsume() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        String expected = "Hello World";
+
+        // create file using regular file
+        template.sendBodyAndHeader("file://" + FTP_ROOT_DIR + "/poll", expected, Exchange.FILE_NAME, "hello.txt");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
+        mock.expectedBodiesReceived(expected);
+
+        ProducerTemplate triggerTemplate = context.createProducerTemplate();
+        triggerTemplate.sendBody("vm:trigger", "");
+
+        assertMockEndpointsSatisfied();
+
+        long startFileDeletionCheckTime = System.currentTimeMillis();
+        boolean fileExists = true;
+        while (System.currentTimeMillis() - startFileDeletionCheckTime < 3000) {  // wait up to 3000ms for file to be deleted
+            File file = new File(FTP_ROOT_DIR + "/poll/hello.txt");
+            fileExists = file.exists();
+
+            if (fileExists) {
+                log.info("Will check that file has been deleted again in 200ms");
+                Thread.sleep(200);
+            }
+        }
+
+        assertFalse("The file should have been deleted", fileExists);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:trigger")
+                    .pollEnrich("ftp://admin@localhost:" + getPort() + "/poll?password=admin&delete=true")
+                    .routeId("foo")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.