You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/28 13:39:06 UTC
svn commit: r1140537 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/test/java/org/apache/camel/component/file/
components/camel-ftp/src/test/java/org/apache/camel/component/f...
Author: davsclaus
Date: Tue Jun 28 11:39:06 2011
New Revision: 1140537
URL: http://svn.apache.org/viewvc?rev=1140537&view=rev
Log:
CAMEL-3655: Fixed polling consumer will suspend/resume scheduled based consumer to avoid having them keep running in the background after usage. For example a ftp consumer will keep polling the ftp server. Introduced PollingConsumerPollingStrategy to control that behavior.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java?rev=1140537&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollingStrategy.java Tue Jun 28 11:39:06 2011
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Strategy that allows consumers to influence the {@link PollingConsumer}.
+ */
+public interface PollingConsumerPollingStrategy {
+
+ /**
+ * Callback invoked when the consumer is started.
+ *
+ * @throws Exception can be thrown if error starting.
+ */
+ void onStartup() throws Exception;
+
+ /**
+ * Callback invoked before the poll.
+ *
+ * @throws Exception can be thrown if error occurred
+ */
+ void beforePoll() throws Exception;
+
+ /**
+ * Callback invoked after the poll.
+ *
+ * @throws Exception can be thrown if error occurred
+ */
+ void afterPoll() throws Exception;
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java?rev=1140537&r1=1140536&r2=1140537&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java Tue Jun 28 11:39:06 2011
@@ -18,11 +18,13 @@ package org.apache.camel.impl;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
@@ -56,11 +58,19 @@ public class EventDrivenPollingConsumer
}
public Exchange receive() {
+ // must be started
+ if (!isRunAllowed() || !isStarted()) {
+ throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
+ }
+
while (isRunAllowed()) {
try {
+ beforePoll();
return queue.take();
} catch (InterruptedException e) {
handleInterruptedException(e);
+ } finally {
+ afterPoll();
}
}
LOG.trace("Consumer is not running, so returning null");
@@ -68,11 +78,24 @@ public class EventDrivenPollingConsumer
}
public Exchange receive(long timeout) {
+ // must be started
+ if (!isRunAllowed() || !isStarted()) {
+ throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
+ }
+
+ // if the queue is empty and there is no wait then return null
+ if (timeout == 0 && queue.isEmpty()) {
+ return null;
+ }
+
try {
+ beforePoll();
return queue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
handleInterruptedException(e);
return null;
+ } finally {
+ afterPoll();
}
}
@@ -92,10 +115,40 @@ public class EventDrivenPollingConsumer
getInterruptedExceptionHandler().handleException(e);
}
+ protected void beforePoll() {
+ if (consumer instanceof PollingConsumerPollingStrategy) {
+ PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
+ try {
+ strategy.beforePoll();
+ } catch (Exception e) {
+ LOG.debug("Error occurred before polling " + consumer + ". This exception will be ignored.", e);
+ }
+ }
+ }
+
+ protected void afterPoll() {
+ if (consumer instanceof PollingConsumerPollingStrategy) {
+ PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
+ try {
+ strategy.afterPoll();
+ } catch (Exception e) {
+ LOG.debug("Error occurred after polling " + consumer + ". This exception will be ignored.", e);
+ }
+ }
+ }
+
protected void doStart() throws Exception {
// lets add ourselves as a consumer
consumer = getEndpoint().createConsumer(this);
- ServiceHelper.startService(consumer);
+
+ // if the consumer has a polling strategy then invoke that
+ if (consumer instanceof PollingConsumerPollingStrategy) {
+ PollingConsumerPollingStrategy strategy = (PollingConsumerPollingStrategy) consumer;
+ strategy.onStartup();
+ } else {
+ // for regular consumers start it
+ ServiceHelper.startService(consumer);
+ }
}
protected void doStop() throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java?rev=1140537&r1=1140536&r2=1140537&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProcessorPollingConsumer.java Tue Jun 28 11:39:06 2011
@@ -16,6 +16,8 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.RejectedExecutionException;
+
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -46,6 +48,11 @@ public class ProcessorPollingConsumer ex
}
public Exchange receive() {
+ // must be started
+ if (!isRunAllowed() || !isStarted()) {
+ throw new RejectedExecutionException(this + " is not started, but in state: " + getStatus().name());
+ }
+
Exchange exchange = getEndpoint().createExchange();
try {
processor.process(exchange);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1140537&r1=1140536&r2=1140537&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Jun 28 11:39:06 2011
@@ -22,10 +22,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
import org.apache.camel.LoggingLevel;
+import org.apache.camel.PollingConsumerPollingStrategy;
import org.apache.camel.Processor;
import org.apache.camel.SuspendableService;
import org.apache.camel.spi.PollingConsumerPollStrategy;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +36,7 @@ import org.slf4j.LoggerFactory;
*
* @version
*/
-public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService {
+public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy {
private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class);
private final ScheduledExecutorService executor;
@@ -270,4 +272,23 @@ public abstract class ScheduledPollConsu
protected void doSuspend() throws Exception {
// dont stop/cancel the future task since we just check in the run method
}
+
+ @Override
+ public void onStartup() throws Exception {
+ // start our self
+ ServiceHelper.startService(this);
+ }
+
+ @Override
+ public void beforePoll() throws Exception {
+ // resume our self
+ ServiceHelper.resumeService(this);
+ }
+
+ @Override
+ public void afterPoll() throws Exception {
+ // suspend our self
+ ServiceHelper.suspendService(this);
+ }
+
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java?rev=1140537&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FilePollingConsumerTest.java Tue Jun 28 11:39:06 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+
+/**
+ * To test that using polling consumer with file will not keep scheduled file consumer keep running
+ * in the background. It should suspend/resume the consumer on demand instead.
+ */
+public class FilePollingConsumerTest extends ContextTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testPollingConsumer() throws Exception {
+ deleteDirectory("target/enrich");
+ template.sendBodyAndHeader("file:target/enrich", "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+ PollingConsumer consumer = context.getEndpoint("file:target/enrich").createPollingConsumer();
+ consumer.start();
+ Exchange exchange = consumer.receive(5000);
+ assertNotNull(exchange);
+ assertEquals("Hello World", exchange.getIn().getBody(String.class));
+
+ // sleep a bit to ensure polling consumer would be suspended after we have used it
+ Thread.sleep(1000);
+
+ // drop a new file which should not be picked up by the consumer
+ template.sendBodyAndHeader("file:target/enrich", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+ // sleep a bit to ensure polling consumer would not have picked up that file
+ Thread.sleep(1000);
+
+ File file = new File("target/enrich/bye.txt").getAbsoluteFile();
+ assertTrue("File should exist " + file, file.exists());
+
+ // and no exchange on consumer as
+ exchange = consumer.receiveNoWait();
+ assertNull(exchange);
+
+ consumer.stop();
+ }
+
+}
Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java?rev=1140537&view=auto
==============================================================================
--- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java (added)
+++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FtpPollingConsumerTest.java Tue Jun 28 11:39:06 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class FtpPollingConsumerTest extends FtpServerTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ private String getFtpUrl() {
+ return "ftp://admin@localhost:" + getPort() + "/polling?password=admin";
+ }
+
+ @Test
+ public void testPollingConsumer() throws Exception {
+ template.sendBodyAndHeader(getFtpUrl(), "Hello World", Exchange.FILE_NAME, "hello.txt");
+
+ PollingConsumer consumer = context.getEndpoint(getFtpUrl()).createPollingConsumer();
+ consumer.start();
+ Exchange exchange = consumer.receive(5000);
+ assertNotNull(exchange);
+ assertEquals("Hello World", exchange.getIn().getBody(String.class));
+
+ // sleep a bit to ensure polling consumer would be suspended after we have used it
+ Thread.sleep(1000);
+
+ // drop a new file which should not be picked up by the consumer
+ template.sendBodyAndHeader(getFtpUrl(), "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+ // sleep a bit to ensure polling consumer would not have picked up that file
+ Thread.sleep(1000);
+
+ File file = new File(FTP_ROOT_DIR + "polling/bye.txt").getAbsoluteFile();
+ assertTrue("File should exist " + file, file.exists());
+
+ // and no exchange on consumer as
+ exchange = consumer.receiveNoWait();
+ assertNull(exchange);
+
+ consumer.stop();
+ }
+
+}