You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2008/12/03 00:44:48 UTC
svn commit: r722674 - in /activemq/camel/trunk/components:
camel-atom/src/main/java/org/apache/camel/component/atom/
camel-atom/src/main/java/org/apache/camel/component/feed/
camel-atom/src/test/java/org/apache/camel/component/atom/
camel-rss/src/main/...
Author: janstey
Date: Tue Dec 2 15:44:48 2008
New Revision: 722674
URL: http://svn.apache.org/viewvc?rev=722674&view=rev
Log:
CAMEL-1101 - Moving more common things into the feed component.
Added:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java (with props)
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java (with props)
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java (with props)
activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java (with props)
activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java (with props)
Removed:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/EntryFilter.java
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedConsumer.java
activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssConsumerSupport.java
Modified:
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java
activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java
activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java
activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java
activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java
activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java
Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEndpoint.java Tue Dec 2 15:44:48 2008
@@ -25,8 +25,8 @@
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.feed.FeedComponent;
-import org.apache.camel.component.feed.FeedConsumer;
import org.apache.camel.component.feed.FeedEndpoint;
+import org.apache.camel.component.feed.FeedPollingConsumer;
import org.apache.camel.impl.DefaultPollingEndpoint;
import org.apache.camel.util.ObjectHelper;
@@ -68,12 +68,12 @@
}
@Override
- protected FeedConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) {
+ protected FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) {
return new AtomEntryPollingConsumer(this, processor, filter, lastUpdate);
}
@Override
- protected FeedConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor) {
+ protected FeedPollingConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor) {
return new AtomPollingConsumer(this, processor);
}
}
Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomEntryPollingConsumer.java Tue Dec 2 15:44:48 2008
@@ -26,49 +26,21 @@
import org.apache.abdera.parser.ParseException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.feed.EntryFilter;
+import org.apache.camel.component.feed.FeedEntryPollingConsumer;
/**
* Consumer to poll atom feeds and return each entry from the feed step by step.
*
* @version $Revision$
*/
-public class AtomEntryPollingConsumer extends AtomPollingConsumer {
+public class AtomEntryPollingConsumer extends FeedEntryPollingConsumer {
private Document<Feed> document;
- private int entryIndex;
- private EntryFilter entryFilter;
- private List<Entry> list;
-
- public AtomEntryPollingConsumer(AtomEndpoint endpoint, Processor processor, boolean filter,
- Date lastUpdate) {
- super(endpoint, processor);
- if (filter) {
- entryFilter = new UpdatedDateFilter(lastUpdate);
- }
- }
-
- public void poll() throws Exception {
- getDocument();
- Feed feed = document.getRoot();
-
- while (hasNextEntry()) {
- Entry entry = list.get(entryIndex--);
-
- boolean valid = true;
- if (entryFilter != null) {
- valid = entryFilter.isValidEntry((AtomEndpoint)endpoint, document, entry);
- }
- if (valid) {
- Exchange exchange = endpoint.createExchange(feed, entry);
- getProcessor().process(exchange);
- // return and wait for the next poll to continue from last time (this consumer is stateful)
- return;
- }
- }
-
- // reset document to be able to poll again
- document = null;
- }
+ public AtomEntryPollingConsumer(AtomEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate) {
+ super(endpoint, processor, filter, lastUpdate);
+ }
+
private Document<Feed> getDocument() throws IOException, ParseException {
if (document == null) {
document = AtomUtils.parseDocument(endpoint.getFeedUri());
@@ -78,8 +50,23 @@
return document;
}
- private boolean hasNextEntry() {
- return entryIndex >= 0;
+ @Override
+ protected void populateList(Object feed) throws ParseException, IOException {
+ // list is populated already in the createFeed method
+ }
+
+ @Override
+ protected Object createFeed() throws IOException {
+ return getDocument().getRoot();
}
+ @Override
+ protected void resetList() {
+ document = null;
+ }
+
+ @Override
+ protected EntryFilter createEntryFilter(Date lastUpdate) {
+ return new UpdatedDateFilter(lastUpdate);
+ }
}
Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/AtomPollingConsumer.java Tue Dec 2 15:44:48 2008
@@ -16,28 +16,27 @@
*/
package org.apache.camel.component.atom;
+import java.io.IOException;
+
import org.apache.abdera.model.Document;
import org.apache.abdera.model.Feed;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.component.feed.FeedConsumer;
+import org.apache.camel.component.feed.FeedPollingConsumer;
/**
* Consumer to poll atom feeds and return the full feed.
*
* @version $Revision$
*/
-public class AtomPollingConsumer extends FeedConsumer {
+public class AtomPollingConsumer extends FeedPollingConsumer {
public AtomPollingConsumer(AtomEndpoint endpoint, Processor processor) {
super(endpoint, processor);
}
- protected void poll() throws Exception {
+ @Override
+ protected Object createFeed() throws IOException {
Document<Feed> document = AtomUtils.parseDocument(endpoint.getFeedUri());
- Feed feed = document.getRoot();
- Exchange exchange = endpoint.createExchange(feed);
- getProcessor().process(exchange);
+ return document.getRoot();
}
-
}
Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/atom/UpdatedDateFilter.java Tue Dec 2 15:44:48 2008
@@ -21,6 +21,8 @@
import org.apache.abdera.model.Document;
import org.apache.abdera.model.Entry;
import org.apache.abdera.model.Feed;
+import org.apache.camel.component.feed.EntryFilter;
+import org.apache.camel.component.feed.FeedEndpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,8 +41,8 @@
this.lastUpdate = lastUpdate;
}
- public boolean isValidEntry(AtomEndpoint endpoint, Document<Feed> feed, Entry entry) {
- Date updated = entry.getUpdated();
+ public boolean isValidEntry(FeedEndpoint endpoint, Object feed, Object entry) {
+ Date updated = ((Entry)entry).getUpdated();
if (updated == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No updated time for entry so assuming its valid: entry=[" + entry + "]");
Added: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java (added)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java Tue Dec 2 15:44:48 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.feed;
+
+
+/**
+ * Filter used by the {@link org.apache.camel.component.feed.FeedEntryPollingConsumer} to filter entries
+ * from the feed.
+ *
+ * @version $Revision: 656106 $
+ */
+public interface EntryFilter {
+
+ /**
+ * Tests to be used as filtering the feed for only entries of interest, such as only new entries, etc.
+ *
+ * @param endpoint the endpoint
+ * @param feed the feed
+ * @param entry the given entry to filter
+ * @return <tt>true</tt> to include the entry, <ff>false</tt> to skip it
+ */
+ boolean isValidEntry(FeedEndpoint endpoint, Object feed, Object entry);
+
+}
Propchange: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEndpoint.java Tue Dec 2 15:44:48 2008
@@ -18,8 +18,6 @@
import java.util.Date;
-import org.apache.abdera.model.Entry;
-import org.apache.abdera.model.Feed;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -64,7 +62,7 @@
}
public Consumer createConsumer(Processor processor) throws Exception {
- FeedConsumer answer;
+ FeedPollingConsumer answer;
if (isSplitEntries()) {
answer = createEntryPollingConsumer(this, processor, filter, lastUpdate);
} else {
@@ -73,14 +71,14 @@
// ScheduledPollConsumer default delay is 500 millis and that is too often for polling a feed,
// so we override with a new default value. End user can override this value by providing a consumer.delay parameter
- answer.setDelay(FeedConsumer.DEFAULT_CONSUMER_DELAY);
+ answer.setDelay(FeedPollingConsumer.DEFAULT_CONSUMER_DELAY);
configureConsumer(answer);
return answer;
}
- protected abstract FeedConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor);
+ protected abstract FeedPollingConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor);
- protected abstract FeedConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate);
+ protected abstract FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate);
protected Exchange createExchangeWithFeedHeader(Object feed, String header) {
Exchange exchange = createExchange();
Added: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java (added)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java Tue Dec 2 15:44:48 2008
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.feed;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * Consumer to poll feeds and return each entry from the feed step by step.
+ *
+ */
+public abstract class FeedEntryPollingConsumer extends FeedPollingConsumer {
+ protected int entryIndex;
+ protected EntryFilter entryFilter;
+ protected List list;
+
+ public FeedEntryPollingConsumer(FeedEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate) {
+ super(endpoint, processor);
+ if (filter) {
+ entryFilter = createEntryFilter(lastUpdate);
+ }
+ }
+
+ public void poll() throws Exception {
+ Object feed = createFeed();
+ populateList(feed);
+
+ while (hasNextEntry()) {
+ Object entry = list.get(entryIndex--);
+
+ boolean valid = true;
+ if (entryFilter != null) {
+ valid = entryFilter.isValidEntry(endpoint, feed, entry);
+ }
+ if (valid) {
+ Exchange exchange = endpoint.createExchange(feed, entry);
+ getProcessor().process(exchange);
+ // return and wait for the next poll to continue from last time (this consumer is stateful)
+ return;
+ }
+ }
+
+ // reset list to be able to poll again
+ resetList();
+ }
+
+ protected abstract EntryFilter createEntryFilter(Date lastUpdate);
+
+ protected abstract void resetList();
+
+ protected abstract void populateList(Object feed) throws Exception;
+
+ private boolean hasNextEntry() {
+ return entryIndex >= 0;
+ }
+}
Propchange: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java (added)
+++ activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java Tue Dec 2 15:44:48 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.feed;
+
+import java.io.IOException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledPollConsumer;
+
+/**
+ * Base class for consuming feeds.
+ */
+public abstract class FeedPollingConsumer extends ScheduledPollConsumer {
+ public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
+ protected final FeedEndpoint endpoint;
+
+ public FeedPollingConsumer(FeedEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ }
+
+ protected void poll() throws Exception {
+ Exchange exchange = endpoint.createExchange(createFeed());
+ getProcessor().process(exchange);
+ }
+
+ protected abstract Object createFeed() throws Exception;
+
+}
Propchange: activemq/camel/trunk/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedPollingConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java (original)
+++ activemq/camel/trunk/components/camel-atom/src/test/java/org/apache/camel/component/atom/UpdatedDateFilterTest.java Tue Dec 2 15:44:48 2008
@@ -26,6 +26,7 @@
import org.apache.abdera.model.Document;
import org.apache.abdera.model.Entry;
import org.apache.abdera.model.Feed;
+import org.apache.camel.component.feed.EntryFilter;
/**
* Unit test for UpdatedDateFilter
Modified: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEndpoint.java Tue Dec 2 15:44:48 2008
@@ -25,11 +25,9 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.component.atom.AtomEntryPollingConsumer;
-import org.apache.camel.component.atom.AtomPollingConsumer;
import org.apache.camel.component.feed.FeedComponent;
-import org.apache.camel.component.feed.FeedConsumer;
import org.apache.camel.component.feed.FeedEndpoint;
+import org.apache.camel.component.feed.FeedPollingConsumer;
import org.apache.camel.impl.DefaultPollingEndpoint;
/**
@@ -69,12 +67,12 @@
}
@Override
- protected FeedConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) {
+ protected FeedPollingConsumer createEntryPollingConsumer(FeedEndpoint feedEndpoint, Processor processor, boolean filter, Date lastUpdate) {
return new RssEntryPollingConsumer(this, processor, filter, lastUpdate);
}
@Override
- protected FeedConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor) {
+ protected FeedPollingConsumer createPollingConsumer(FeedEndpoint feedEndpoint, Processor processor) {
return new RssPollingConsumer(this, processor);
}
}
Modified: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java (original)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssEntryPollingConsumer.java Tue Dec 2 15:44:48 2008
@@ -17,54 +17,43 @@
package org.apache.camel.component.rss;
import java.util.Date;
-import java.util.List;
-import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.feed.synd.SyndFeed;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.feed.EntryFilter;
+import org.apache.camel.component.feed.FeedEntryPollingConsumer;
+
/**
* Consumer to poll RSS feeds and return each entry from the feed step by step.
*
*/
-public class RssEntryPollingConsumer extends RssPollingConsumer {
- private int entryIndex;
- private List<SyndEntry> list;
-
- public RssEntryPollingConsumer(RssEndpoint endpoint, Processor processor) {
- super(endpoint, processor);
- }
+public class RssEntryPollingConsumer extends FeedEntryPollingConsumer {
public RssEntryPollingConsumer(RssEndpoint endpoint, Processor processor, boolean filter, Date lastUpdate) {
- this(endpoint, processor);
- }
-
- public void poll() throws Exception {
- SyndFeed feed = createFeed();
- populateList(feed);
-
- while (hasNextEntry()) {
- SyndEntry entry = list.get(entryIndex--);
- Exchange exchange = endpoint.createExchange(feed, entry);
- getProcessor().process(exchange);
- // return and wait for the next poll to continue from last time (this consumer is stateful)
- return;
- }
-
- list = null;
+ super(endpoint, processor, filter, lastUpdate);
}
-
- private void populateList(SyndFeed feed) {
+
+ @Override
+ protected void populateList(Object feed) throws Exception {
if (list == null) {
- list = feed.getEntries();
+ list = ((SyndFeed)feed).getEntries();
entryIndex = list.size() - 1;
}
}
- private boolean hasNextEntry() {
- return entryIndex >= 0;
+ @Override
+ protected Object createFeed() throws Exception {
+ return RssUtils.createFeed(endpoint.getFeedUri());
}
+ @Override
+ protected void resetList() {
+ list = null;
+ }
+
+ protected EntryFilter createEntryFilter(Date lastUpdate) {
+ return new UpdatedDateFilter(lastUpdate);
+ }
}
Modified: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java?rev=722674&r1=722673&r2=722674&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java (original)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssPollingConsumer.java Tue Dec 2 15:44:48 2008
@@ -16,24 +16,26 @@
*/
package org.apache.camel.component.rss;
+import java.io.IOException;
+
import com.sun.syndication.feed.synd.SyndFeed;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.feed.FeedPollingConsumer;
/**
* Consumer to poll RSS feeds and return the full feed.
*/
-public class RssPollingConsumer extends RssConsumerSupport {
+public class RssPollingConsumer extends FeedPollingConsumer {
public RssPollingConsumer(RssEndpoint endpoint, Processor processor) {
super(endpoint, processor);
}
- protected void poll() throws Exception {
- SyndFeed feed = createFeed();
- Exchange exchange = endpoint.createExchange(feed);
- getProcessor().process(exchange);
+ @Override
+ protected Object createFeed() throws Exception {
+ return RssUtils.createFeed(endpoint.getFeedUri());
}
}
Added: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java (added)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java Tue Dec 2 15:44:48 2008
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rss;
+
+import java.io.InputStream;
+import java.net.URL;
+
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.io.SyndFeedInput;
+import com.sun.syndication.io.XmlReader;
+
+public final class RssUtils {
+ private RssUtils() {
+ // Helper class
+ }
+
+ public static SyndFeed createFeed(String feedUri) throws Exception {
+ InputStream in = new URL(feedUri).openStream();
+ SyndFeedInput input = new SyndFeedInput();
+ return input.build(new XmlReader(in));
+ }
+}
Propchange: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/RssUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java?rev=722674&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java (added)
+++ activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java Tue Dec 2 15:44:48 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rss;
+
+import java.util.Date;
+
+import com.sun.syndication.feed.synd.SyndEntry;
+
+import org.apache.camel.component.feed.EntryFilter;
+import org.apache.camel.component.feed.FeedEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Filters out all entries which occur before the last time of the entry we saw (assuming
+ * entries arrive sorted in order).
+ *
+ * @version $Revision: 656106 $
+ */
+public class UpdatedDateFilter implements EntryFilter {
+
+ private static final transient Log LOG = LogFactory.getLog(UpdatedDateFilter.class);
+ private Date lastUpdate;
+
+ public UpdatedDateFilter(Date lastUpdate) {
+ this.lastUpdate = lastUpdate;
+ }
+
+ public boolean isValidEntry(FeedEndpoint endpoint, Object feed, Object entry) {
+ Date updated = ((SyndEntry)entry).getUpdatedDate();
+ if (updated == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No updated time for entry so assuming its valid: entry=[" + entry + "]");
+ }
+ return true;
+ }
+ if (lastUpdate != null) {
+ if (lastUpdate.after(updated)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Entry is older than lastupdate=[" + lastUpdate
+ + "], no valid entry=[" + entry + "]");
+ }
+ return false;
+ }
+ }
+ lastUpdate = updated;
+ return true;
+ }
+
+}
Propchange: activemq/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/UpdatedDateFilter.java
------------------------------------------------------------------------------
svn:eol-style = native