You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/07 19:43:40 UTC
[2/8] incubator-streams git commit: example of STREAMS-415 using
twitter
example of STREAMS-415 using twitter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0813b11e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0813b11e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0813b11e
Branch: refs/heads/master
Commit: 0813b11edd535322cbabafd9a91e77136812e8bb
Parents: 9bf8ef9
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Tue Oct 4 17:37:06 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Tue Oct 4 17:37:24 2016 -0500
----------------------------------------------------------------------
.../provider/TwitterTimelineProvider.java | 40 ++++++++-
.../src/site/markdown/index.md | 18 ++++
.../provider/TwitterTimelineProviderIT.java | 93 ++++++++++++++++++++
.../provider/TwitterTimelineProviderTest.java | 39 --------
.../resources/TwitterTimelineProviderTest.conf | 4 +
5 files changed, 154 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index a8eada4..b8653b8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -18,15 +18,23 @@
package org.apache.streams.twitter.provider;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -48,12 +56,40 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Retrieve recent posts from a list of user ids or names.
*/
-public class TwitterTimelineProvider implements StreamsProvider, Serializable {
+public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable {
public final static String STREAMS_ID = "TwitterTimelineProvider";
private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class);
+ private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+ public static void main(String[] args) {
+ TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration("twitter");
+ TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
+ provider.run();
+ }
+
+ @Override
+ public void run() {
+ prepare(config);
+ startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ Iterator<StreamsDatum> iterator = readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = MAPPER.writeValueAsString(datum.getDocument());
+ System.out.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( isRunning());
+ }
+
public static final int MAX_NUMBER_WAITING = 10000;
private TwitterUserInformationConfiguration config;
@@ -116,6 +152,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
running.set(true);
executor.shutdown();
+
}
public boolean shouldContinuePulling(List<Status> statuses) {
@@ -304,4 +341,5 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
lock.readLock().unlock();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
index ec5d1c8..4249956 100644
--- a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
+++ b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md
@@ -31,6 +31,24 @@ streams-provider-twitter contains schema definitions, providers, conversions, an
| TwitterStreamProvider [TwitterStreamProvider.html](apidocs/org/apache/streams/twitter/TwitterStreamProvider.html "javadoc") | [TwitterStreamConfiguration.json](com/twitter/TwitterStreamConfiguration.json "TwitterStreamConfiguration.json") [TwitterUserInformationConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterStreamConfiguration.html "javadoc") | [sample.conf](sample.conf "sample.conf")<br/>[userstream.conf](userstream.conf "userstream.conf") |
| TwitterFollowingProvider [TwitterFollowingProvider.html](apidocs/org/apache/streams/twitter/TwitterFollowingConfiguration.html "javadoc") | [TwitterFollowingConfiguration.json](com/twitter/TwitterFollowingConfiguration.json "TwitterFollowingConfiguration.json") [TwitterFollowingConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterFollowingConfiguration.html "javadoc") | [friends.conf](friends.conf "friends.conf")<br/>[followers.conf](followers.conf "followers.conf") |
+Test:
+-----
+
+Create a local file `application.conf` with valid twitter credentials
+
+ twitter {
+ oauth {
+ consumerKey = ""
+ consumerSecret = ""
+ accessToken = ""
+ accessTokenSecret = ""
+ }
+ }
+
+Build with integration testing enabled, using your credentials
+
+ mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf"
+
[JavaDocs](apidocs/index.html "JavaDocs")
###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
new file mode 100644
index 0000000..e0f3b6a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.junit.Test;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class TwitterTimelineProviderIT {
+
+ @Test
+ public void testTwitterTimelineProvider() throws Exception {
+
+ PrintStream stdout = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stdout.txt")));
+ PrintStream stderr = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stderr.txt")));
+
+ System.setOut(stdout);
+ System.setErr(stderr);
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/TwitterTimelineProviderTest.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe);
+ TwitterUserInformationConfiguration testConfig = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe.getConfig("twitter"));
+
+ TwitterTimelineProvider provider = new TwitterTimelineProvider(testConfig);
+ provider.run();
+
+ stdout.flush();
+ stderr.flush();
+
+ File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt");
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() == 1000);
+
+ File err = new File("target/test-classes/TwitterTimelineProviderTest.stderr.txt");
+ assert (err.exists());
+ assert (err.canRead());
+ assert (err.isFile());
+
+ FileReader errReader = new FileReader(err);
+ LineNumberReader errCounter = new LineNumberReader(errReader);
+
+ while(errCounter.readLine() != null) {}
+
+ assert (errCounter.getLineNumber() == 0);
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
deleted file mode 100644
index 0cdede0..0000000
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.twitter.TwitterUserInformationConfiguration;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class TwitterTimelineProviderTest {
-
- @Test
- public void consolidateToIDsTest() {
- List<String> ids = Arrays.asList("2342342", "", "144523", null);
-
- TwitterUserInformationConfiguration twitterUserInformationConfiguration = new TwitterUserInformationConfiguration();
- twitterUserInformationConfiguration.setInfo(ids);
- TwitterTimelineProvider twitterTimelineProvider = new TwitterTimelineProvider(twitterUserInformationConfiguration);
-
- twitterTimelineProvider.consolidateToIDs();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
new file mode 100644
index 0000000..a7862c4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf
@@ -0,0 +1,4 @@
+twitter.info = [
+ 18055613
+]
+twitter.max_items = 1000
\ No newline at end of file