You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/04/03 18:40:15 UTC
svn commit: r525204 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
Author: jstrachan
Date: Tue Apr 3 09:40:13 2007
New Revision: 525204
URL: http://svn.apache.org/viewvc?view=rev&rev=525204
Log:
minor refactor of the exception handling being in the base class
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java?view=diff&rev=525204&r1=525203&r2=525204
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java Tue Apr 3 09:40:13 2007
@@ -20,6 +20,8 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -31,6 +33,8 @@
* @version $Revision$
*/
public abstract class PollingConsumer<E extends Exchange> extends DefaultConsumer<E> implements Runnable {
+ private static final transient Log log = LogFactory.getLog(PollingConsumer.class);
+
private long initialDelay = 1000;
private long delay = 500;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@@ -41,6 +45,19 @@
super(endpoint, processor);
}
+ /**
+ * Invoked whenever we should be polled
+ */
+ public void run() {
+ log.debug("Starting to poll");
+ try {
+ poll();
+ }
+ catch (Exception e) {
+ log.warn("Caught: " + e, e);
+ }
+ }
+
// Properties
//-------------------------------------------------------------------------
public long getInitialDelay() {
@@ -77,6 +94,13 @@
// Implementation methods
//-------------------------------------------------------------------------
+
+ /**
+ * The polling method which is invoked periodically to poll this consumer
+ *
+ * @throws Exception
+ */
+ protected abstract void poll() throws Exception;
@Override
protected void doStart() throws Exception {
Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?view=diff&rev=525204&r1=525203&r2=525204
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Apr 3 09:40:13 2007
@@ -47,36 +47,27 @@
this.template = endpoint.createTransactionStrategy();
}
- /**
- * Invoked whenever we should be polled
- */
- public synchronized void run() {
- log.debug("Starting to poll for new database entities to process");
- try {
- template.execute(new JpaCallback() {
- public Object doInJpa(EntityManager entityManager) throws PersistenceException {
- Query query = getQueryFactory().createQuery(entityManager);
- configureParameters(query);
- List results = query.getResultList();
- for (Object result : results) {
- if (log.isDebugEnabled()) {
- log.debug("Processing new entity: " + result);
- }
+ protected void poll() throws Exception {
+ template.execute(new JpaCallback() {
+ public Object doInJpa(EntityManager entityManager) throws PersistenceException {
+ Query query = getQueryFactory().createQuery(entityManager);
+ configureParameters(query);
+ List results = query.getResultList();
+ for (Object result : results) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing new entity: " + result);
+ }
- if (lockEntity(result, entityManager)) {
- // lets turn the result into an exchange and fire it into the processor
- Exchange exchange = createExchange(result);
- getProcessor().onExchange(exchange);
- getDeleteHandler().deleteObject(entityManager, result);
- }
+ if (lockEntity(result, entityManager)) {
+ // lets turn the result into an exchange and fire it into the processor
+ Exchange exchange = createExchange(result);
+ getProcessor().onExchange(exchange);
+ getDeleteHandler().deleteObject(entityManager, result);
}
- return null;
}
- });
- }
- catch (RuntimeException e) {
- log.warn("Caught: " + e, e);
- }
+ return null;
+ }
+ });
}
// Properties