You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/19 20:43:11 UTC

[1/2] incubator-streams git commit: level up rss provider

Repository: incubator-streams
Updated Branches:
  refs/heads/master dd58c877b -> cae682d6e


level up rss provider

add main methods to each Provider (STREAMS-412)
add real integration tests (STREAMS-415)
fix a failing test


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/427faf53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/427faf53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/427faf53

Branch: refs/heads/master
Commit: 427faf5378b3de99df81604dd4ec856c89766224
Parents: 2c12724
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Mon Oct 17 11:44:20 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Mon Oct 17 11:44:20 2016 -0500

----------------------------------------------------------------------
 .../streams/rss/provider/RssStreamProvider.java |  59 +++++++++
 .../src/site/markdown/index.md                  |   7 ++
 .../rss/provider/RssStreamProviderTaskIT.java   |   2 +-
 .../streams/rss/test/RssStreamProviderIT.java   | 121 +++++++++++++++++++
 4 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
index e444ed3..a0e8ea1 100644
--- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
+++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java
@@ -18,22 +18,30 @@
 
 package org.apache.streams.rss.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.sun.syndication.feed.synd.SyndEntry;
 import com.sun.syndication.feed.synd.SyndFeed;
 import com.sun.syndication.io.FeedException;
 import com.sun.syndication.io.SyndFeedInput;
 import com.sun.syndication.io.XmlReader;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.rss.FeedDetails;
 import org.apache.streams.rss.RssStreamConfiguration;
 import org.apache.streams.rss.provider.perpetual.RssFeedScheduler;
@@ -42,7 +50,11 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.net.MalformedURLException;
@@ -52,7 +64,15 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
+ * RSS {@link org.apache.streams.core.StreamsProvider} that provides content from rss feeds in boilerpipe format
  *
+ *  To use from command line:
+ *
+ *  Supply configuration similar to src/test/resources/rss.conf
+ *
+ *  Launch using:
+ *
+ *  mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json"
  */
 public class RssStreamProvider implements StreamsProvider {
 
@@ -177,5 +197,44 @@ public class RssStreamProvider implements StreamsProvider {
         ComponentUtils.shutdownExecutor(this.executor, 10, 10);
     }
 
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
 
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+        RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss");
+        RssStreamProvider provider = new RssStreamProvider(config);
+
+        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = mapper.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/site/markdown/index.md b/streams-contrib/streams-provider-rss/src/site/markdown/index.md
index 3a4a561..8563909 100644
--- a/streams-contrib/streams-provider-rss/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-rss/src/site/markdown/index.md
@@ -15,6 +15,13 @@ streams-provider-rss
 |-------|---------------|--------------------------|
 | RssStreamProvider [RssStreamProvider.html](apidocs/org/apache/streams/rss/provider/RssStreamProvider.html "javadoc") | [RssStreamConfiguration.json](RssStreamConfiguration.json "RssStreamConfiguration.json") [RssStreamConfiguration.html](apidocs/org/apache/streams/rss/RssStreamConfiguration.html "javadoc") | [rss.conf](rss.conf "rss.conf") |
 
+Test:
+-----
+
+Build with integration testing enabled
+
+    mvn clean test verify -DskipITs=false
+    
 [JavaDocs](apidocs/index.html "JavaDocs")
 
 ###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
index a838c78..cb71c90 100644
--- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java
@@ -73,7 +73,7 @@ public class RssStreamProviderTaskIT {
     public void testPerpetualNoTimeFramePull() throws Exception {
         com.healthmarketscience.common.util.resource.Handler.init();
         BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
-        RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(1), 10000, true);
+        RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(5), 10000, true);
         Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml"));
         assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size());
         RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
new file mode 100644
index 0000000..ccac8aa
--- /dev/null
+++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.rss.test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.rss.FeedDetails;
+import org.apache.streams.rss.RssStreamConfiguration;
+import org.apache.streams.rss.provider.RssStreamProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.List;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+
+/**
+ * Created by sblackmon on 2/5/14.
+ */
+public class RssStreamProviderIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class);
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Test
+    public void testRssStreamProvider() throws Exception {
+
+        String configfile = "./target/test-classes/RssStreamProviderIT.conf";
+        String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt";
+
+        InputStream is = RssStreamProviderIT.class.getResourceAsStream("/top100.txt");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        RssStreamConfiguration configuration = new RssStreamConfiguration();
+        List<FeedDetails> feedArray = Lists.newArrayList();
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                if(!StringUtils.isEmpty(line))
+                {
+                    feedArray.add(new FeedDetails().withUrl(line).withPollIntervalMillis(5000l));
+                }
+            }
+            configuration.setFeeds(feedArray);
+        } catch( Exception e ) {
+            System.out.println(e);
+            e.printStackTrace();
+            Assert.fail();
+        }
+
+        Assert.assertThat(configuration.getFeeds().size(), greaterThan(70));
+
+        OutputStream os = new FileOutputStream(configfile);
+        OutputStreamWriter osw = new OutputStreamWriter(os);
+        BufferedWriter bw = new BufferedWriter(osw);
+
+        // write conf
+        ObjectNode feedsNode = mapper.convertValue(configuration, ObjectNode.class);
+        JsonNode configNode = mapper.createObjectNode().set("rss", feedsNode);
+
+        bw.write(mapper.writeValueAsString(configNode));
+        bw.flush();
+        bw.close();
+
+        File config = new File(configfile);
+        assert (config.exists());
+        assert (config.canRead());
+        assert (config.isFile());
+
+        RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() >= 200);
+
+    }
+}
\ No newline at end of file


[2/2] incubator-streams git commit: Merge branch '0.4-rss'

Posted by sb...@apache.org.
Merge branch '0.4-rss'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cae682d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cae682d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cae682d6

Branch: refs/heads/master
Commit: cae682d6e9a33ba37bfc25e271761e11372866f4
Parents: dd58c87 427faf5
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 19 15:41:37 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 19 15:41:37 2016 -0500

----------------------------------------------------------------------
 .../streams/rss/provider/RssStreamProvider.java |  59 +++++++++
 .../src/site/markdown/index.md                  |   7 ++
 .../rss/provider/RssStreamProviderTaskIT.java   |   2 +-
 .../streams/rss/test/RssStreamProviderIT.java   | 121 +++++++++++++++++++
 4 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------