You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/19 20:43:11 UTC
[1/2] incubator-streams git commit: level up rss provider
Repository: incubator-streams
Updated Branches:
refs/heads/master dd58c877b -> cae682d6e
level up rss provider
add main methods to each Provider (STREAMS-412)
add real integration tests (STREAMS-415)
fix a failing test
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/427faf53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/427faf53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/427faf53
Branch: refs/heads/master
Commit: 427faf5378b3de99df81604dd4ec856c89766224
Parents: 2c12724
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Mon Oct 17 11:44:20 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Mon Oct 17 11:44:20 2016 -0500
----------------------------------------------------------------------
.../streams/rss/provider/RssStreamProvider.java | 59 +++++++++
.../src/site/markdown/index.md | 7 ++
.../rss/provider/RssStreamProviderTaskIT.java | 2 +-
.../streams/rss/test/RssStreamProviderIT.java | 121 +++++++++++++++++++
4 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index e444ed3..a0e8ea1 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -18,22 +18,30 @@
package org.apache.streams.rss.provider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.io.FeedException;
import com.sun.syndication.io.SyndFeedInput;
import com.sun.syndication.io.XmlReader;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.rss.FeedDetails;
import org.apache.streams.rss.RssStreamConfiguration;
import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
@@ -42,7 +50,11 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.PrintStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.net.MalformedURLException;
@@ -52,7 +64,15 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
+ * RSS {@link org.apache.streams.core.StreamsProvider} that provides content from rss feeds in boilerpipe format
*
+ * To use from command line:
+ *
+ * Supply configuration similar to src/test/resources/rss.conf
+ *
+ * Launch using:
+ *
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json"
*/
public class RssStreamProvider implements StreamsProvider {
@@ -177,5 +197,44 @@ public class RssStreamProvider implements StreamsProvider {
ComponentUtils.shutdownExecutor(this.executor, 10, 10);
}
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File(configfile);
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss");
+ RssStreamProvider provider = new RssStreamProvider(config);
+
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = mapper.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/site/markdown/index.md b/streams-contrib/streams-provider-rss/src/site/markdown/index.md
index 3a4a561..8563909 100644
--- a/streams-contrib/streams-provider-rss/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-rss/src/site/markdown/index.md
@@ -15,6 +15,13 @@ streams-provider-rss
|-------|---------------|--------------------------|
| RssStreamProvider [RssStreamProvider.html](apidocs/org/apache/streams/rss/provider/RssStreamProvider.html "javadoc") | [RssStreamConfiguration.json](RssStreamConfiguration.json "RssStreamConfiguration.json") [RssStreamConfiguration.html](apidocs/org/apache/streams/rss/RssStreamConfiguration.html "javadoc") | [rss.conf](rss.conf "rss.conf") |
+Test:
+-----
+
+Build with integration testing enabled
+
+ mvn clean test verify -DskipITs=false
+
[JavaDocs](apidocs/index.html "JavaDocs")
###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
index a838c78..cb71c90 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
@@ -73,7 +73,7 @@ public class RssStreamProviderTaskIT {
public void testPerpetualNoTimeFramePull() throws Exception {
com.healthmarketscience.common.util.resource.Handler.init();
BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
- RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(1), 10000, true);
+ RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(5), 10000, true);
Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size());
RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
new file mode 100644
index 0000000..ccac8aa
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.rss.FeedDetails;
+import org.apache.streams.rss.RssStreamConfiguration;
+import org.apache.streams.rss.provider.RssStreamProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.List;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+
+/**
+ * Created by sblackmon on 2/5/14.
+ */
+public class RssStreamProviderIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class);
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ @Test
+ public void testRssStreamProvider() throws Exception {
+
+ String configfile = "./target/test-classes/RssStreamProviderIT.conf";
+ String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt";
+
+ InputStream is = RssStreamProviderIT.class.getResourceAsStream("/top100.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ RssStreamConfiguration configuration = new RssStreamConfiguration();
+ List<FeedDetails> feedArray = Lists.newArrayList();
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if(!StringUtils.isEmpty(line))
+ {
+ feedArray.add(new FeedDetails().withUrl(line).withPollIntervalMillis(5000l));
+ }
+ }
+ configuration.setFeeds(feedArray);
+ } catch( Exception e ) {
+ System.out.println(e);
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ Assert.assertThat(configuration.getFeeds().size(), greaterThan(70));
+
+ OutputStream os = new FileOutputStream(configfile);
+ OutputStreamWriter osw = new OutputStreamWriter(os);
+ BufferedWriter bw = new BufferedWriter(osw);
+
+ // write conf
+ ObjectNode feedsNode = mapper.convertValue(configuration, ObjectNode.class);
+ JsonNode configNode = mapper.createObjectNode().set("rss", feedsNode);
+
+ bw.write(mapper.writeValueAsString(configNode));
+ bw.flush();
+ bw.close();
+
+ File config = new File(configfile);
+ assert (config.exists());
+ assert (config.canRead());
+ assert (config.isFile());
+
+ RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+ File out = new File(outfile);
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() >= 200);
+
+ }
+}
\ No newline at end of file
[2/2] incubator-streams git commit: Merge branch '0.4-rss'
Posted by sb...@apache.org.
Merge branch '0.4-rss'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cae682d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cae682d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cae682d6
Branch: refs/heads/master
Commit: cae682d6e9a33ba37bfc25e271761e11372866f4
Parents: dd58c87 427faf5
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 19 15:41:37 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 19 15:41:37 2016 -0500
----------------------------------------------------------------------
.../streams/rss/provider/RssStreamProvider.java | 59 +++++++++
.../src/site/markdown/index.md | 7 ++
.../rss/provider/RssStreamProviderTaskIT.java | 2 +-
.../streams/rss/test/RssStreamProviderIT.java | 121 +++++++++++++++++++
4 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------