You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/11/27 07:49:40 UTC
[camel] 01/03: CAMEL-12020: Added polling consumer for file/ftp
components that poll on-demand and do not use a background scheduler.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 21807defcfcbc40feafd80de68c5e0601a893ff8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Nov 26 19:37:26 2017 +0100
CAMEL-12020: Added polling consumer for file/ftp components that poll on-demand and do not use a background scheduler.
---
.../apache/camel/component/file/FileEndpoint.java | 19 ++
.../camel/component/file/GenericFileConsumer.java | 19 +-
.../component/file/GenericFilePollingConsumer.java | 229 +++++++++++++++++++++
.../camel/impl/EventDrivenPollingConsumer.java | 10 +-
.../camel/component/test/TestFileSplitTest.java | 2 +-
.../camel/impl/ConsumerCacheZeroCapacityTest.java | 2 +-
.../enricher/PollEnrichBridgeErrorHandlerTest.java | 3 +-
.../camel/component/file/remote/FtpEndpoint.java | 2 +-
.../component/file/remote/RemoteFileEndpoint.java | 16 ++
.../camel/component/file/remote/SftpEndpoint.java | 1 -
...llEnrichConsumeWithDisconnectAndDeleteTest.java | 77 +++++++
11 files changed, 373 insertions(+), 7 deletions(-)
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
index 4b61d6c..9bfdc41 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
@@ -26,7 +26,9 @@ import java.util.Set;
import org.apache.camel.Component;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
+import org.apache.camel.impl.EventDrivenPollingConsumer;
import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
@@ -119,6 +121,23 @@ public class FileEndpoint extends GenericFileEndpoint<File> {
return result;
}
+ @Override
+ public PollingConsumer createPollingConsumer() throws Exception {
+ ObjectHelper.notNull(operations, "operations");
+ ObjectHelper.notNull(file, "file");
+
+ if (log.isDebugEnabled()) {
+ log.debug("Creating GenericFilePollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
+ getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout());
+ }
+ GenericFilePollingConsumer result = new GenericFilePollingConsumer(this);
+ // should not call configurePollingConsumer when its GenericFilePollingConsumer
+ result.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+ result.setBlockTimeout(getPollingConsumerBlockTimeout());
+
+ return result;
+ }
+
public GenericFileProducer<File> createProducer() throws Exception {
ObjectHelper.notNull(operations, "operations");
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index b6d7d67..619bf6b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -93,7 +93,7 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
/**
* Poll for files
*/
- protected int poll() throws Exception {
+ public int poll() throws Exception {
// must prepare on startup the very first time
if (!prepareOnStartup) {
// prepare on startup
@@ -720,4 +720,21 @@ public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsum
prepareOnStartup = false;
super.doStop();
}
+
+ @Override
+ public void onInit() throws Exception {
+ // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+ }
+
+ @Override
+ public long beforePoll(long timeout) throws Exception {
+ // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+ return timeout;
+ }
+
+ @Override
+ public void afterPoll() throws Exception {
+ // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
+ }
+
}
diff --git a/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
new file mode 100644
index 0000000..09f6bcf
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/component/file/GenericFilePollingConsumer.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.EventDrivenPollingConsumer;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.spi.PollingConsumerPollStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GenericFilePollingConsumer extends EventDrivenPollingConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GenericFilePollingConsumer.class);
+
+ private final long delay;
+
+ public GenericFilePollingConsumer(GenericFileEndpoint endpoint) throws Exception {
+ super(endpoint);
+ this.delay = endpoint.getDelay();
+ }
+
+ @Override
+ protected Consumer createConsumer() throws Exception {
+ // lets add ourselves as a consumer
+ GenericFileConsumer consumer = (GenericFileConsumer) super.createConsumer();
+ // do not start scheduler as we poll manually
+ consumer.setStartScheduler(false);
+ consumer.setMaxMessagesPerPoll(1);
+ // we only want to poll once so disconnect by default
+ return consumer;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ // ensure consumer is started
+ ServiceHelper.startService(getConsumer());
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ }
+
+ @Override
+ protected GenericFileConsumer getConsumer() {
+ return (GenericFileConsumer) super.getConsumer();
+ }
+
+ @Override
+ public Exchange receiveNoWait() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("receiveNoWait polling file: {}", getConsumer().getEndpoint());
+ }
+ int polled = doReceive(0);
+ if (polled > 0) {
+ return super.receive(0);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Exchange receive() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("receive polling file: {}", getConsumer().getEndpoint());
+ }
+ int polled = doReceive(Long.MAX_VALUE);
+ if (polled > 0) {
+ return super.receive();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Exchange receive(long timeout) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("receive({}) polling file: {}", timeout, getConsumer().getEndpoint());
+ }
+ int polled = doReceive(timeout);
+ if (polled > 0) {
+ return super.receive(timeout);
+ } else {
+ return null;
+ }
+ }
+
+ protected int doReceive(long timeout) {
+ int retryCounter = -1;
+ boolean done = false;
+ Throwable cause = null;
+ int polledMessages = 0;
+ PollingConsumerPollStrategy pollStrategy = getConsumer().getPollStrategy();
+ boolean sendEmptyMessageWhenIdle = getConsumer() instanceof ScheduledBatchPollingConsumer && getConsumer().isSendEmptyMessageWhenIdle();
+ StopWatch watch = new StopWatch();
+
+ while (!done) {
+ try {
+ cause = null;
+ // eager assume we are done
+ done = true;
+ if (isRunAllowed()) {
+
+ if (retryCounter == -1) {
+ LOG.trace("Starting to poll: {}", this.getEndpoint());
+ } else {
+ LOG.debug("Retrying attempt {} to poll: {}", retryCounter, this.getEndpoint());
+ }
+
+ // mark we are polling which should also include the begin/poll/commit
+ boolean begin = pollStrategy.begin(this, getEndpoint());
+ if (begin) {
+ retryCounter++;
+ polledMessages = getConsumer().poll();
+ LOG.trace("Polled {} messages", polledMessages);
+
+ if (polledMessages == 0 && sendEmptyMessageWhenIdle) {
+ // send an "empty" exchange
+ processEmptyMessage();
+ } else if (polledMessages == 0 && timeout > 0) {
+ // if we did not poll a file and we are using timeout then try to poll again
+ done = false;
+ }
+
+ pollStrategy.commit(this, getEndpoint(), polledMessages);
+ } else {
+ LOG.debug("Cannot begin polling as pollStrategy returned false: {}", pollStrategy);
+ }
+ }
+
+ LOG.trace("Finished polling: {}", this.getEndpoint());
+ } catch (Exception e) {
+ try {
+ boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
+ if (retry) {
+ // do not set cause as we retry
+ done = false;
+ } else {
+ cause = e;
+ done = true;
+ }
+ } catch (Throwable t) {
+ cause = t;
+ done = true;
+ }
+ } catch (Throwable t) {
+ cause = t;
+ done = true;
+ }
+
+ if (!done && timeout > 0) {
+ // prepare for next attempt until we hit timeout
+ long left = timeout - watch.taken();
+ long min = Math.min(left, delay);
+ if (min > 0) {
+ try {
+ // sleep for next pool
+ sleep(min);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ } else {
+ // timeout hit
+ done = true;
+ }
+ }
+ }
+
+ if (cause != null) {
+ throw ObjectHelper.wrapRuntimeCamelException(cause);
+ }
+
+ return polledMessages;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Object name = exchange.getIn().getHeader(Exchange.FILE_NAME);
+ if (name != null) {
+ LOG.debug("Received file: {}", name);
+ }
+ super.process(exchange);
+ }
+
+ /**
+ * No messages to poll so send an empty message instead.
+ *
+ * @throws Exception is thrown if error processing the empty message.
+ */
+ protected void processEmptyMessage() throws Exception {
+ Exchange exchange = getEndpoint().createExchange();
+ log.debug("Sending empty message as there were no messages from polling: {}", this.getEndpoint());
+ process(exchange);
+ }
+
+ private static void sleep(long delay) throws InterruptedException {
+ if (delay <= 0) {
+ return;
+ }
+ LOG.trace("Sleeping for: {} millis", delay);
+ Thread.sleep(delay);
+ }
+
+}
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
index df00e0d..066abf4 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
@@ -211,9 +211,17 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement
}
}
+ protected Consumer getConsumer() {
+ return consumer;
+ }
+
+ protected Consumer createConsumer() throws Exception {
+ return getEndpoint().createConsumer(this);
+ }
+
protected void doStart() throws Exception {
// lets add ourselves as a consumer
- consumer = getEndpoint().createConsumer(this);
+ consumer = createConsumer();
// if the consumer has a polling strategy then invoke that
if (consumer instanceof PollingConsumerPollingStrategy) {
diff --git a/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java b/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
index 648e571..1732b0f 100644
--- a/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/test/TestFileSplitTest.java
@@ -41,7 +41,7 @@ public class TestFileSplitTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
from("direct:start")
- .to("test:file:target/testme?noop=true&split=true");
+ .to("test:file:target/testme?noop=true&split=true&timeout=1000");
}
});
context.start();
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
index 30f38a8..31cd3c1 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
@@ -45,7 +45,7 @@ public class ConsumerCacheZeroCapacityTest extends ContextTestSupport {
consumer.receive(50);
boolean found = Thread.getAllStackTraces().keySet().stream().anyMatch(t -> t.getName().contains("target/foo"));
- assertTrue("Should find file consumer thread", found);
+ assertFalse("Should not find file consumer thread", found);
cache.releasePollingConsumer(endpoint, consumer);
diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
index 4d6d287..3b417c5 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichBridgeErrorHandlerTest.java
@@ -50,7 +50,8 @@ public class PollEnrichBridgeErrorHandlerTest extends ContextTestSupport {
Exception caught = getMockEndpoint("mock:dead").getExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
assertNotNull(caught);
- assertEquals("Something went wrong", caught.getMessage());
+ assertTrue(caught.getMessage().startsWith("Error during poll"));
+ assertEquals("Something went wrong", caught.getCause().getCause().getMessage());
}
@Override
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
index 45406f8..846500e 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpEndpoint.java
@@ -83,7 +83,7 @@ public class FtpEndpoint<T extends FTPFile> extends RemoteFileEndpoint<FTPFile>
throw new FailedToCreateProducerException(this, e);
}
}
-
+
public RemoteFileOperations<FTPFile> createRemoteFileOperations() throws Exception {
// configure ftp client
FTPClient client = ftpClient;
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
index 884678a..a037afd 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java
@@ -19,10 +19,12 @@ package org.apache.camel.component.file.remote;
import java.util.Map;
import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileExist;
+import org.apache.camel.component.file.GenericFilePollingConsumer;
import org.apache.camel.component.file.GenericFileProducer;
import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
import org.apache.camel.spi.UriParam;
@@ -128,6 +130,20 @@ public abstract class RemoteFileEndpoint<T> extends GenericFileEndpoint<T> {
return consumer;
}
+ @Override
+ public PollingConsumer createPollingConsumer() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Creating GenericFilePollingConsumer with queueSize: {} blockWhenFull: {} blockTimeout: {}",
+ getPollingConsumerQueueSize(), isPollingConsumerBlockWhenFull(), getPollingConsumerBlockTimeout());
+ }
+ GenericFilePollingConsumer result = new GenericFilePollingConsumer(this);
+ // should not call configurePollingConsumer when its GenericFilePollingConsumer
+ result.setBlockWhenFull(isPollingConsumerBlockWhenFull());
+ result.setBlockTimeout(getPollingConsumerBlockTimeout());
+
+ return result;
+ }
+
/**
* Validates this endpoint if its configured properly.
*
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
index 8bfeb4f..42341bc 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.file.remote;
-import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.Proxy;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFileConfiguration;
diff --git a/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java
new file mode 100644
index 0000000..258cb66
--- /dev/null
+++ b/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollEnrichConsumeWithDisconnectAndDeleteTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class FtpPollEnrichConsumeWithDisconnectAndDeleteTest extends FtpServerTestSupport {
+
+ @Test
+ public void testFtpSimpleConsume() throws Exception {
+ if (!canTest()) {
+ return;
+ }
+
+ String expected = "Hello World";
+
+ // create file using regular file
+ template.sendBodyAndHeader("file://" + FTP_ROOT_DIR + "/poll", expected, Exchange.FILE_NAME, "hello.txt");
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.expectedHeaderReceived(Exchange.FILE_NAME, "hello.txt");
+ mock.expectedBodiesReceived(expected);
+
+ ProducerTemplate triggerTemplate = context.createProducerTemplate();
+ triggerTemplate.sendBody("vm:trigger", "");
+
+ assertMockEndpointsSatisfied();
+
+ long startFileDeletionCheckTime = System.currentTimeMillis();
+ boolean fileExists = true;
+ while (System.currentTimeMillis() - startFileDeletionCheckTime < 3000) { // wait up to 3000ms for file to be deleted
+ File file = new File(FTP_ROOT_DIR + "/poll/hello.txt");
+ fileExists = file.exists();
+
+ if (fileExists) {
+ log.info("Will check that file has been deleted again in 200ms");
+ Thread.sleep(200);
+ }
+ }
+
+ assertFalse("The file should have been deleted", fileExists);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("vm:trigger")
+ .pollEnrich("ftp://admin@localhost:" + getPort() + "/poll?password=admin&delete=true")
+ .routeId("foo")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.