You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/06/03 12:22:25 UTC
flink git commit: Implemented TwitterSourceFilter and adapted
TwitterSource
Repository: flink
Updated Branches:
refs/heads/master bc7f6d9e5 -> bf9cc81a7
Implemented TwitterSourceFilter and adapted TwitterSource
renamed TwitterFilterSourceTest to TwitterFilterSourceExample and added
JavaDoc comments.
Added a dummy twitter.properties to the resource file
fixed TwitterSource.java
This closes #695
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf9cc81a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf9cc81a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf9cc81a
Branch: refs/heads/master
Commit: bf9cc81a79951ebac40b00a5d8f67128a86efe4e
Parents: bc7f6d9
Author: yildirim <yi...@gronau.neofonie.priv>
Authored: Tue May 19 14:27:27 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jun 3 12:21:51 2015 +0200
----------------------------------------------------------------------
.../connectors/twitter/TwitterFilterSource.java | 280 +++++++++++++++++++
.../twitter/TwitterFilterSourceExample.java | 68 +++++
.../connectors/twitter/TwitterSource.java | 132 ++-------
.../src/main/resources/twitter.properties | 19 ++
4 files changed, 387 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bf9cc81a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
new file mode 100644
index 0000000..8dd4458
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.core.endpoint.Location;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
+import com.twitter.hbc.httpclient.auth.Authentication;
+
+/**
+ *
+ * An extension of {@link TwitterSource} by filter parameters. This extension
+ * enables to filter the twitter stream by user defined parameters.
+ */
+public class TwitterFilterSource extends TwitterSource {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TwitterFilterSource.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private List<String> trackTerms = new LinkedList<String>();
+
+ private List<String> languages = new LinkedList<String>();
+
+ private List<Long> followings = new LinkedList<Long>();
+
+ private List<Location> locations = new LinkedList<Location>();
+
+ private Map<String, String> queryParameters = new HashMap<String, String>();
+
+ private Map<String, String> postParameters = new HashMap<String, String>();
+
+ public TwitterFilterSource(String authPath) {
+ super(authPath);
+ }
+
+ @Override
+ protected void initializeConnection() {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Initializing Twitter Streaming API connection");
+ }
+ queue = new LinkedBlockingQueue<String>(queueSize);
+
+ StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
+ configEndpoint(endpoint);
+ endpoint.stallWarnings(false);
+
+ Authentication auth = authenticate();
+
+ initializeClient(endpoint, auth);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Twitter Streaming API connection established successfully");
+ }
+ }
+
+ /**
+ * This function configures the streaming endpoint
+ *
+ * @param endpoint
+ * The streaming endpoint
+ */
+ private void configEndpoint(StatusesFilterEndpoint endpoint) {
+ if (!trackTerms.isEmpty()) {
+ endpoint.trackTerms(trackTerms);
+ }
+ if (!languages.isEmpty()) {
+ endpoint.languages(languages);
+ }
+ if (!followings.isEmpty()) {
+ endpoint.followings(followings);
+ }
+ if (!locations.isEmpty()) {
+ endpoint.locations(locations);
+ }
+ if (!queryParameters.isEmpty()) {
+ for (Entry<String, String> entry : queryParameters.entrySet()) {
+ endpoint.addQueryParameter(entry.getKey(), entry.getValue());
+ }
+ }
+ if (!postParameters.isEmpty()) {
+ for (Entry<String, String> entry : postParameters.entrySet()) {
+ endpoint.addPostParameter(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * This function sets which term to track.
+ *
+ * @param term
+ * The term to track.
+ */
+ public void trackTerm(String term) {
+ this.trackTerms.add(term);
+ }
+
+ /**
+ * This function sets which terms to track.
+ *
+ * @param terms
+ * The terms to track.
+ */
+ public void trackTerms(Collection<String> terms) {
+ this.trackTerms.addAll(terms);
+ }
+
+ /**
+ * This function tells which terms are tracked.
+ */
+ public List<String> getTrackTerms() {
+ return this.trackTerms;
+ }
+
+ /**
+ * This function sets which language to filter.
+ *
+ * @param language
+ * The language to filter.
+ */
+ public void filterLanguage(String language) {
+ this.languages.add(language);
+ }
+
+ /**
+ * This function sets which languages to filter.
+ *
+ * @param languages
+ * The languages to filter.
+ */
+ public void filterLanguages(Collection<String> languages) {
+ this.languages.addAll(languages);
+ }
+
+ /**
+ * This function tells which languages are filtered.
+ */
+ public List<String> getLanguages() {
+ return this.languages;
+ }
+
+ /**
+ * This function sets which user to follow.
+ *
+ * @param userID
+ * The ID of the user to follow.
+ */
+ public void filterFollowings(Long userID) {
+ this.followings.add(userID);
+ }
+
+ /**
+ * This function sets which users to follow.
+ *
+ * @param userIDs
+ * The IDs of the users to follow.
+ */
+ public void filterFollowings(Collection<Long> userIDs) {
+ this.followings.addAll(userIDs);
+ }
+
+ /**
+ * This function tells which users are followed.
+ */
+ public List<Long> getFollowings() {
+ return this.followings;
+ }
+
+ /**
+ * This function sets which location to filter.
+ *
+ * @param location
+ * The location to filter.
+ */
+ public void filterLocation(Location location) {
+ this.locations.add(location);
+ }
+
+ /**
+ * This function sets which locations to filter.
+ *
+ * @param locations
+ * The locations to filter.
+ */
+ public void filterLocations(Collection<Location> locations) {
+ this.locations.addAll(locations);
+ }
+
+ /**
+ * This function tells which locations are filtered.
+ */
+ public List<Location> getLocations() {
+ return this.locations;
+ }
+
+ /**
+ * This function sets a query parameter.
+ *
+ * @param parameter
+ * The name of the query parameter.
+ * @param value
+ * The value of the query parameter.
+ */
+ public void addQueryParameter(String parameter, String value) {
+ this.queryParameters.put(parameter, value);
+ }
+
+ /**
+ * This function sets query parameters.
+ *
+ * @param queryParameters
+ * The query parameters for the endpoint.
+ */
+ public void addQueryParameters(Map<String, String> queryParameters) {
+ this.queryParameters.putAll(queryParameters);
+ }
+
+ /**
+ * This function tells which query parameters are used by the endpoint.
+ */
+ public Map<String, String> getQueryParameters() {
+ return this.queryParameters;
+ }
+
+ /**
+ * This function sets a post parameter.
+ *
+ * @param parameter
+ * The name of the post parameter.
+ * @param value
+ * The value of the post parameter.
+ */
+ public void addPostParameter(String parameter, String value) {
+ this.postParameters.put(parameter, value);
+ }
+
+ /**
+ * This function sets post parameters.
+ *
+ * @param postParameters
+ * The post parameters for the endpoint.
+ */
+ public void addPostParameters(Map<String, String> postParameters) {
+ this.postParameters.putAll(postParameters);
+ }
+
+ /**
+ * This function tells which post parameters are used by the endpoint.
+ */
+ public Map<String, String> postParameters() {
+ return this.postParameters;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bf9cc81a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
new file mode 100644
index 0000000..43cb179
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an example how to use TwitterFilterSource. Before executing the
+ * example you have to define the access keys of twitter.properties in the
+ * resource folder. The access keys can be found in your twitter account.
+ */
+public class TwitterFilterSourceExample {
+
+ /**
+ * path to the twitter properties
+ */
+ private static final String PATH_TO_AUTH_FILE = "/twitter.properties";
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .getExecutionEnvironment();
+
+ TwitterFilterSource twitterSource = new TwitterFilterSource(
+ TwitterFilterSourceExample.class.getResource(PATH_TO_AUTH_FILE)
+ .getFile());
+
+ twitterSource.trackTerm("obama");
+ twitterSource.filterLanguage("en");
+
+ DataStream<String> streamSource = env.addSource(twitterSource).flatMap(
+ new JSONParseFlatMap<String, String>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String s, Collector<String> c)
+ throws Exception {
+ c.collect(s);
+ }
+ });
+
+ streamSource.print();
+
+ try {
+ env.execute("Twitter Streaming Test");
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bf9cc81a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 78e4aa5..0b47985 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -22,17 +22,16 @@ import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
@@ -41,26 +40,22 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
/**
* Implementation of {@link SourceFunction} specialized to emit tweets from
- * Twitter. It can connect to Twitter Streaming API, collect tweets and
+ * Twitter.
*/
-public class TwitterSource extends RichParallelSourceFunction<String> {
+public class TwitterSource extends RichSourceFunction<String> {
private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
private static final long serialVersionUID = 1L;
private String authPath;
- private transient BlockingQueue<String> queue;
- private int queueSize = 10000;
+ protected transient BlockingQueue<String> queue;
+ protected int queueSize = 10000;
private transient BasicClient client;
private int waitSec = 5;
private int maxNumberOfTweets;
private int currentNumberOfTweets;
- private String nextElement = null;
-
- private volatile boolean isRunning = false;
-
/**
* Create {@link TwitterSource} for streaming
*
@@ -96,7 +91,7 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
/**
* Initialize Hosebird Client to be able to consume Twitter's Streaming API
*/
- private void initializeConnection() {
+ protected void initializeConnection() {
if (LOG.isInfoEnabled()) {
LOG.info("Initializing Twitter Streaming API connection");
@@ -116,10 +111,10 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
}
}
- private OAuth1 authenticate() {
+ protected OAuth1 authenticate() {
Properties authenticationProperties = loadAuthenticationProperties();
-
+
return new OAuth1(authenticationProperties.getProperty("consumerKey"),
authenticationProperties.getProperty("consumerSecret"),
authenticationProperties.getProperty("token"),
@@ -132,6 +127,7 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
* @return the authentication data.
*/
private Properties loadAuthenticationProperties() {
+
Properties properties = new Properties();
try {
InputStream input = new FileInputStream(authPath);
@@ -143,82 +139,15 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
return properties;
}
- private void initializeClient(StatusesSampleEndpoint endpoint, Authentication auth) {
+ protected void initializeClient(DefaultStreamingEndpoint endpoint, Authentication auth) {
client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
.endpoint(endpoint).authentication(auth)
.processor(new StringDelimitedProcessor(queue)).build();
-
+
client.connect();
}
- /**
- * Put tweets into output
- *
- * @param collector
- * Collector in which the tweets are collected.
- */
- protected void collectFiniteMessages(Collector<String> collector) {
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Collecting tweets");
- }
-
- for (int i = 0; i < maxNumberOfTweets; i++) {
- collectOneMessage(collector);
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Collecting tweets finished");
- }
- }
-
- /**
- * Put tweets into output
- *
- * @param collector
- * Collector in which the tweets are collected.
- */
- protected void collectMessages(Collector<String> collector) {
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Tweet-stream begins");
- }
-
- while (isRunning) {
- collectOneMessage(collector);
- }
- }
-
- /**
- * Put one tweet into the output.
- *
- * @param collector
- * Collector in which the tweets are collected.
- */
- protected void collectOneMessage(Collector<String> collector) {
- if (client.isDone()) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
- .getMessage());
- }
- }
-
- try {
- String msg = queue.poll(waitSec, TimeUnit.SECONDS);
- if (msg != null) {
- collector.collect(msg);
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Did not receive a message in {} seconds", waitSec);
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
- }
-
- }
-
private void closeConnection() {
if (LOG.isInfoEnabled()) {
@@ -273,50 +202,29 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
@Override
public boolean reachedEnd() throws Exception {
- if (currentNumberOfTweets >= maxNumberOfTweets) {
- return false;
- }
-
- if (nextElement != null) {
+ if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets){
+ closeConnection();
return true;
}
+
if (client.isDone()) {
if (LOG.isErrorEnabled()) {
LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
.getMessage());
}
- return false;
- }
-
- try {
- String msg = queue.poll(waitSec, TimeUnit.SECONDS);
- if (msg != null) {
- nextElement = msg;
- return true;
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Did not receive a message in {} seconds", waitSec);
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+ return true;
}
+
return false;
}
@Override
public String next() throws Exception {
- if (nextElement != null) {
- String result = nextElement;
- nextElement = null;
- return result;
- }
if (reachedEnd()) {
throw new RuntimeException("Twitter stream end reached.");
- } else {
- String result = nextElement;
- nextElement = null;
- return result;
}
+
+ String msg = queue.take();
+ return msg;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bf9cc81a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
new file mode 100644
index 0000000..1ca4143
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+secret=***
+consumerSecret=***
+token=***-***
+consumerKey=***