You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2008/12/03 00:44:48 UTC

svn commit: r722674 - in /activemq/camel/trunk/components: camel-atom/src/main/java/org/apache/camel/component/atom/ camel-atom/src/main/java/org/apache/camel/component/feed/ camel-atom/src/test/java/org/apache/camel/component/atom/ camel-rss/src/main/...

Author: janstey
Date: Tue Dec  2 15:44:48 2008
New Revision: 722674

URL: http://svn.apache.org/viewvc?rev=722674&view=rev
Log:
CAMEL-1101 - Moving more common things into the feed component.

Added:
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java   (with props)
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java   (with props)
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java   (with props)
    activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java   (with props)
    activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java   (with props)
Removed:
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/EntryFilter.java
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedConsumer.java
    activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssConsumerSupport.java
Modified:
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java
    activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java
    activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java
    activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java
    activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java
    activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java

Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java Tue Dec  2 15:44:48 2008
@@ -25,8 +25,8 @@
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.feed.FeedComponent;
-import org.apache.camel.component.feed.FeedConsumer;
 import org.apache.camel.component.feed.FeedEndpoint;
+import org.apache.camel.component.feed.FeedPollingConsumer;
 import org.apache.camel.impl.DefaultPollingEndpoint;
 import org.apache.camel.util.ObjectHelper;
 
@@ -68,12 +68,12 @@
     }
 
     @Override
-    protected FeedConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) {
+    protected FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) {
         return new AtomEntryPollingConsumer(this, processor, filter, lastUpdate);
     }  
     
     @Override
-    protected FeedConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor) {
+    protected FeedPollingConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor) {
         return new AtomPollingConsumer(this, processor); 
     }
 }

Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java Tue Dec  2 15:44:48 2008
@@ -26,49 +26,21 @@
 import org.apache.abdera.parser.ParseException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.feed.EntryFilter;
+import org.apache.camel.component.feed.FeedEntryPollingConsumer;
 
 /**
  * Consumer to poll atom feeds and return each entry from the feed step by step.
  *
  * @version $Revision$
  */
-public class AtomEntryPollingConsumer extends AtomPollingConsumer {
+public class AtomEntryPollingConsumer extends FeedEntryPollingConsumer {
     private Document<Feed> document;
-    private int entryIndex;
-    private EntryFilter entryFilter;
-    private List<Entry> list;
-
-    public AtomEntryPollingConsumer(AtomEndpoint endpoint, Processor processor, boolean filter,
-                                    Date lastUpdate) {
-        super(endpoint, processor);
-        if (filter) {
-            entryFilter = new UpdatedDateFilter(lastUpdate);
-        }
-    }
-
-    public void poll() throws Exception {
-        getDocument();
-        Feed feed = document.getRoot();
-
-        while (hasNextEntry()) {
-            Entry entry = list.get(entryIndex--);
-
-            boolean valid = true;
-            if (entryFilter != null) {
-                valid = entryFilter.isValidEntry((AtomEndpoint)endpoint, document, entry);
-            }
-            if (valid) {
-                Exchange exchange = endpoint.createExchange(feed, entry);
-                getProcessor().process(exchange);
-                // return and wait for the next poll to continue from last time (this consumer is stateful)
-                return;
-            }
-        }
-
-        // reset document to be able to poll again
-        document = null;
-    }
 
+    public AtomEntryPollingConsumer(AtomEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate) {
+        super(endpoint, processor, filter, lastUpdate);
+    }   
+    
     private Document<Feed> getDocument() throws IOException, ParseException {
         if (document == null) {
             document = AtomUtils.parseDocument(endpoint.getFeedUri());
@@ -78,8 +50,23 @@
         return document;
     }
 
-    private boolean hasNextEntry() {
-        return entryIndex >= 0;
+    @Override
+    protected void populateList(Object feed) throws ParseException, IOException {
+        // list is populated already in the createFeed method
+    }
+
+    @Override
+    protected Object createFeed() throws IOException {
+        return getDocument().getRoot();
     }
 
+    @Override
+    protected void resetList() {
+        document = null;    
+    }
+    
+    @Override
+    protected EntryFilter createEntryFilter(Date lastUpdate) {
+        return new UpdatedDateFilter(lastUpdate);
+    }
 }

Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java Tue Dec  2 15:44:48 2008
@@ -16,28 +16,27 @@
  */
 package org.apache.camel.component.atom;
 
+import java.io.IOException;
+
 import org.apache.abdera.model.Document;
 import org.apache.abdera.model.Feed;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.feed.FeedConsumer;
+import org.apache.camel.component.feed.FeedPollingConsumer;
 
 /**
  * Consumer to poll atom feeds and return the full feed.
  *
  * @version $Revision$
  */
-public class AtomPollingConsumer extends FeedConsumer {
+public class AtomPollingConsumer extends FeedPollingConsumer {
 
     public AtomPollingConsumer(AtomEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
 
-    protected void poll() throws Exception {
+    @Override
+    protected Object createFeed() throws IOException {
         Document<Feed> document = AtomUtils.parseDocument(endpoint.getFeedUri());
-        Feed feed = document.getRoot();
-        Exchange exchange = endpoint.createExchange(feed);
-        getProcessor().process(exchange);
+        return document.getRoot();
     }
-
 }

Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java Tue Dec  2 15:44:48 2008
@@ -21,6 +21,8 @@
 import org.apache.abdera.model.Document;
 import org.apache.abdera.model.Entry;
 import org.apache.abdera.model.Feed;
+import org.apache.camel.component.feed.EntryFilter;
+import org.apache.camel.component.feed.FeedEndpoint;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -39,8 +41,8 @@
         this.lastUpdate = lastUpdate;
     }
 
-    public boolean isValidEntry(AtomEndpoint endpoint, Document<Feed> feed, Entry entry) {
-        Date updated = entry.getUpdated();
+    public boolean isValidEntry(FeedEndpoint endpoint, Object feed, Object entry) {        
+        Date updated = ((Entry)entry).getUpdated();
         if (updated == null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("No updated time for entry so assuming its valid: entry=[" + entry + "]");

Added: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java (added)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java Tue Dec  2 15:44:48 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.feed;
+
+
+/**
+ * Filter used by the {@link org.apache.camel.component.feed.FeedEntryPollingConsumer} to filter entries
+ * from the feed.
+ *
+ * @version $Revision: 656106 $
+ */
+public interface EntryFilter {
+
+    /**
+     * Tests to be used as filtering the feed for only entries of interest, such as only new entries, etc.
+     *
+     * @param endpoint  the endpoint
+     * @param feed      the feed
+     * @param entry     the given entry to filter
+     * @return  <tt>true</tt> to include the entry, <ff>false</tt> to skip it
+     */
+    boolean isValidEntry(FeedEndpoint endpoint, Object feed, Object entry);
+
+}

Propchange: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java Tue Dec  2 15:44:48 2008
@@ -18,8 +18,6 @@
 
 import java.util.Date;
 
-import org.apache.abdera.model.Entry;
-import org.apache.abdera.model.Feed;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -64,7 +62,7 @@
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        FeedConsumer answer;
+        FeedPollingConsumer answer;
         if (isSplitEntries()) {
             answer = createEntryPollingConsumer(this, processor, filter, lastUpdate);
         } else {
@@ -73,14 +71,14 @@
         
         // ScheduledPollConsumer default delay is 500 millis and that is too often for polling a feed,
         // so we override with a new default value. End user can override this value by providing a consumer.delay parameter
-        answer.setDelay(FeedConsumer.DEFAULT_CONSUMER_DELAY);
+        answer.setDelay(FeedPollingConsumer.DEFAULT_CONSUMER_DELAY);
         configureConsumer(answer);
         return answer;
     }
 
-    protected abstract FeedConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor);
+    protected abstract FeedPollingConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor);
 
-    protected abstract FeedConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate);
+    protected abstract FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate);
 
     protected Exchange createExchangeWithFeedHeader(Object feed, String header) {
         Exchange exchange = createExchange();

Added: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java (added)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java Tue Dec  2 15:44:48 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.feed;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * Consumer to poll feeds and return each entry from the feed step by step.
+ *
+ */
+public abstract class FeedEntryPollingConsumer extends FeedPollingConsumer {
+    protected int entryIndex;
+    protected EntryFilter entryFilter;
+    protected List list;
+
+    public FeedEntryPollingConsumer(FeedEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate) {
+        super(endpoint, processor);
+        if (filter) {
+            entryFilter = createEntryFilter(lastUpdate);
+        }
+    }
+
+    public void poll() throws Exception {
+        Object feed = createFeed();
+        populateList(feed);   
+
+        while (hasNextEntry()) {
+            Object entry = list.get(entryIndex--);
+
+            boolean valid = true;
+            if (entryFilter != null) {
+                valid = entryFilter.isValidEntry(endpoint, feed, entry);
+            }
+            if (valid) {
+                Exchange exchange = endpoint.createExchange(feed, entry);
+                getProcessor().process(exchange);
+                // return and wait for the next poll to continue from last time (this consumer is stateful)
+                return;
+            }
+        }
+
+        // reset list to be able to poll again
+        resetList();
+    }
+
+    protected abstract EntryFilter createEntryFilter(Date lastUpdate);
+    
+    protected abstract void resetList();
+
+    protected abstract void populateList(Object feed) throws Exception; 
+    
+    private boolean hasNextEntry() {
+        return entryIndex >= 0;
+    }
+}

Propchange: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java (added)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java Tue Dec  2 15:44:48 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.feed;
+
+import java.io.IOException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledPollConsumer;
+
+/**
+ * Base class for consuming feeds.
+ */
+public abstract class FeedPollingConsumer extends ScheduledPollConsumer {
+    public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
+    protected final FeedEndpoint endpoint;
+
+    public FeedPollingConsumer(FeedEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+    }
+
+    protected void poll() throws Exception {
+        Exchange exchange = endpoint.createExchange(createFeed());
+        getProcessor().process(exchange);
+    }
+
+    protected abstract Object createFeed() throws Exception;
+
+}

Propchange: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java Tue Dec  2 15:44:48 2008
@@ -26,6 +26,7 @@
 import org.apache.abdera.model.Document;
 import org.apache.abdera.model.Entry;
 import org.apache.abdera.model.Feed;
+import org.apache.camel.component.feed.EntryFilter;
 
 /**
  * Unit test for UpdatedDateFilter

Modified: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java Tue Dec  2 15:44:48 2008
@@ -25,11 +25,9 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.atom.AtomEntryPollingConsumer;
-import org.apache.camel.component.atom.AtomPollingConsumer;
 import org.apache.camel.component.feed.FeedComponent;
-import org.apache.camel.component.feed.FeedConsumer;
 import org.apache.camel.component.feed.FeedEndpoint;
+import org.apache.camel.component.feed.FeedPollingConsumer;
 import org.apache.camel.impl.DefaultPollingEndpoint;
 
 /**
@@ -69,12 +67,12 @@
     }
 
     @Override
-    protected FeedConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) {
+    protected FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) {
         return new RssEntryPollingConsumer(this, processor, filter, lastUpdate);
     }  
     
     @Override
-    protected FeedConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor) {
+    protected FeedPollingConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor) {
         return new RssPollingConsumer(this, processor); 
     }
 }

Modified: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java (original)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java Tue Dec  2 15:44:48 2008
@@ -17,54 +17,43 @@
 package org.apache.camel.component.rss;
 
 import java.util.Date;
-import java.util.List;
 
-import com.sun.syndication.feed.synd.SyndEntry;
 import com.sun.syndication.feed.synd.SyndFeed;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.feed.EntryFilter;
+import org.apache.camel.component.feed.FeedEntryPollingConsumer;
+
 
 /**
  * Consumer to poll RSS feeds and return each entry from the feed step by step.
  *
  */
-public class RssEntryPollingConsumer extends RssPollingConsumer {
-    private int entryIndex;
-    private List<SyndEntry> list;
-
-    public RssEntryPollingConsumer(RssEndpoint endpoint, Processor processor) {
-        super(endpoint, processor);
-    }
+public class RssEntryPollingConsumer extends FeedEntryPollingConsumer {
 
     public RssEntryPollingConsumer(RssEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate) {
-        this(endpoint, processor);
-    }
-
-    public void poll() throws Exception {
-        SyndFeed feed = createFeed();
-        populateList(feed);        
-
-        while (hasNextEntry()) {
-            SyndEntry entry = list.get(entryIndex--);
-            Exchange exchange = endpoint.createExchange(feed, entry);
-            getProcessor().process(exchange);
-            // return and wait for the next poll to continue from last time (this consumer is stateful)
-            return;
-        }
-        
-        list = null;
+        super(endpoint, processor, filter, lastUpdate);
     }
-
-    private void populateList(SyndFeed feed) {
+    
+    @Override
+    protected void populateList(Object feed) throws Exception {
         if (list == null) {
-            list = feed.getEntries();
+            list = ((SyndFeed)feed).getEntries();
             entryIndex = list.size() - 1;
         }
     }
 
-    private boolean hasNextEntry() {
-        return entryIndex >= 0;
+    @Override
+    protected Object createFeed() throws Exception {
+        return RssUtils.createFeed(endpoint.getFeedUri());
     }
 
+    @Override
+    protected void resetList() {
+        list = null;    
+    }
+    
+    protected EntryFilter createEntryFilter(Date lastUpdate) {
+        return new UpdatedDateFilter(lastUpdate);
+    }    
 }

Modified: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java (original)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java Tue Dec  2 15:44:48 2008
@@ -16,24 +16,26 @@
  */
 package org.apache.camel.component.rss;
 
+import java.io.IOException;
+
 import com.sun.syndication.feed.synd.SyndFeed;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.feed.FeedPollingConsumer;
 
 /**
  * Consumer to poll RSS feeds and return the full feed.
  */
-public class RssPollingConsumer extends RssConsumerSupport {
+public class RssPollingConsumer extends FeedPollingConsumer {
 
     public RssPollingConsumer(RssEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
 
-    protected void poll() throws Exception {
-        SyndFeed feed = createFeed();
-        Exchange exchange = endpoint.createExchange(feed);
-        getProcessor().process(exchange);        
+    @Override
+    protected Object createFeed() throws Exception {
+        return RssUtils.createFeed(endpoint.getFeedUri());
     }
 
 }

Added: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java (added)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java Tue Dec  2 15:44:48 2008
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rss;
+
+import java.io.InputStream;
+import java.net.URL;
+
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.io.SyndFeedInput;
+import com.sun.syndication.io.XmlReader;
+
+public final class RssUtils {
+    private RssUtils() {
+        // Helper class
+    }
+    
+    public static SyndFeed createFeed(String feedUri) throws Exception {
+        InputStream in = new URL(feedUri).openStream();
+        SyndFeedInput input = new SyndFeedInput();
+        return input.build(new XmlReader(in));
+    }
+}

Propchange: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java (added)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java Tue Dec  2 15:44:48 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rss;
+
+import java.util.Date;
+
+import com.sun.syndication.feed.synd.SyndEntry;
+
+import org.apache.camel.component.feed.EntryFilter;
+import org.apache.camel.component.feed.FeedEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Filters out all entries which occur before the last time of the entry we saw (assuming
+ * entries arrive sorted in order).
+ *
+ * @version $Revision: 656106 $
+ */
+public class UpdatedDateFilter implements EntryFilter {
+
+    private static final transient Log LOG = LogFactory.getLog(UpdatedDateFilter.class);
+    private Date lastUpdate;
+
+    public UpdatedDateFilter(Date lastUpdate) {
+        this.lastUpdate = lastUpdate;
+    }
+
+    public boolean isValidEntry(FeedEndpoint endpoint, Object feed, Object entry) {        
+        Date updated = ((SyndEntry)entry).getUpdatedDate();
+        if (updated == null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No updated time for entry so assuming its valid: entry=[" + entry + "]");
+            }
+            return true;
+        }
+        if (lastUpdate != null) {
+            if (lastUpdate.after(updated)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Entry is older than lastupdate=[" + lastUpdate
+                        + "], no valid entry=[" + entry + "]");
+                }
+                return false;
+            }
+        }
+        lastUpdate = updated;
+        return true;
+    }
+
+}

Propchange: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native