You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/07 19:43:40 UTC

[2/8] incubator-streams git commit: example of STREAMS-415 using twitter

example of STREAMS-415 using twitter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0813b11e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0813b11e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0813b11e

Branch: refs/heads/master
Commit: 0813b11edd535322cbabafd9a91e77136812e8bb
Parents: 9bf8ef9
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 4 17:37:06 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 4 17:37:24 2016 -0500

----------------------------------------------------------------------
 .../provider/TwitterTimelineProvider.java       | 40 ++++++++-
 .../src/site/markdown/index.md                  | 18 ++++
 .../provider/TwitterTimelineProviderIT.java     | 93 ++++++++++++++++++++
 .../provider/TwitterTimelineProviderTest.java   | 39 --------
 .../resources/TwitterTimelineProviderTest.conf  |  4 +
 5 files changed, 154 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index a8eada4..b8653b8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,15 +18,23 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -48,12 +56,40 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /**
  *  Retrieve recent posts from a list of user ids or names.
  */
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
 
     public final static String STREAMS_ID = "TwitterTimelineProvider";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
+    private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+    public static void main(String[] args) {
+        TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration("twitter");
+        TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
+        provider.run();
+    }
+
+    @Override
+    public void run() {
+        prepare(config);
+        startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            Iterator<StreamsDatum> iterator = readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = MAPPER.writeValueAsString(datum.getDocument());
+                    System.out.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( isRunning());
+    }
+
     public static final int MAX_NUMBER_WAITING = 10000;
 
     private TwitterUserInformationConfiguration config;
@@ -116,6 +152,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
         running.set(true);
 
         executor.shutdown();
+
     }
 
     public boolean shouldContinuePulling(List<Status> statuses) {
@@ -304,4 +341,5 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
             lock.readLock().unlock();
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
index ec5d1c8..4249956 100644
--- a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
@@ -31,6 +31,24 @@ streams-provider-twitter contains schema definitions, providers, conversions, an
 | TwitterStreamProvider [TwitterStreamProvider.html](apidocs/org/apache/streams/twitter/TwitterStreamProvider.html "javadoc") | [TwitterStreamConfiguration.json](com/twitter/TwitterStreamConfiguration.json "TwitterStreamConfiguration.json") [TwitterUserInformationConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterStreamConfiguration.html "javadoc") | [sample.conf](sample.conf "sample.conf")<br/>[userstream.conf](userstream.conf "userstream.conf") |
 | TwitterFollowingProvider [TwitterFollowingProvider.html](apidocs/org/apache/streams/twitter/TwitterFollowingConfiguration.html "javadoc") | [TwitterFollowingConfiguration.json](com/twitter/TwitterFollowingConfiguration.json "TwitterFollowingConfiguration.json") [TwitterFollowingConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterFollowingConfiguration.html "javadoc") | [friends.conf](friends.conf "friends.conf")<br/>[followers.conf](followers.conf "followers.conf") |
 
+Test:
+-----
+
+Create a local file `application.conf` with valid twitter credentials
+
+    twitter {
+      oauth {
+        consumerKey = ""
+        consumerSecret = ""
+        accessToken = ""
+        accessTokenSecret = ""
+      }
+    }
+    
+Build with integration testing enabled, using your credentials
+
+    mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
+
 [JavaDocs](apidocs/index.html "JavaDocs")
 
 ###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
new file mode 100644
index 0000000..e0f3b6a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.junit.Test;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class TwitterTimelineProviderIT {
+
+    @Test
+    public void testTwitterTimelineProvider() throws Exception {
+
+        PrintStream stdout = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stdout.txt")));
+        PrintStream stderr = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stderr.txt")));
+
+        System.setOut(stdout);
+        System.setErr(stderr);
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File("target/test-classes/TwitterTimelineProviderTest.conf");
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+        StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterUserInformationConfiguration testConfig = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe.getConfig("twitter"));
+
+        TwitterTimelineProvider provider = new TwitterTimelineProvider(testConfig);
+        provider.run();
+
+        stdout.flush();
+        stderr.flush();
+
+        File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt");
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() == 1000);
+
+        File err = new File("target/test-classes/TwitterTimelineProviderTest.stderr.txt");
+        assert (err.exists());
+        assert (err.canRead());
+        assert (err.isFile());
+
+        FileReader errReader = new FileReader(err);
+        LineNumberReader errCounter = new LineNumberReader(errReader);
+
+        while(errCounter.readLine() != null) {}
+
+        assert (errCounter.getLineNumber() == 0);
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
deleted file mode 100644
index 0cdede0..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class TwitterTimelineProviderTest {
-
-    @Test
-    public void consolidateToIDsTest() {
-        List<String> ids = Arrays.asList("2342342", "", "144523", null);
-
-        TwitterUserInformationConfiguration twitterUserInformationConfiguration = new TwitterUserInformationConfiguration();
-        twitterUserInformationConfiguration.setInfo(ids);
-        TwitterTimelineProvider twitterTimelineProvider = new TwitterTimelineProvider(twitterUserInformationConfiguration);
-
-        twitterTimelineProvider.consolidateToIDs();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
new file mode 100644
index 0000000..a7862c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
@@ -0,0 +1,4 @@
+twitter.info = [
+  18055613
+]
+twitter.max_items = 1000
\ No newline at end of file