You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/06/03 12:22:25 UTC

flink git commit: Implemented TwitterSourceFilter and adapted TwitterSource

Repository: flink
Updated Branches:
  refs/heads/master bc7f6d9e5 -> bf9cc81a7


Implemented TwitterSourceFilter and adapted TwitterSource

renamed TwitterFilterSourceTest to TwitterFilterSourceExample and added
JavaDoc comments.

Added a dummy twitter.properties to the resource file

fixed TwitterSource.java

This closes #695


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf9cc81a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf9cc81a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf9cc81a

Branch: refs/heads/master
Commit: bf9cc81a79951ebac40b00a5d8f67128a86efe4e
Parents: bc7f6d9
Author: yildirim <yi...@gronau.neofonie.priv>
Authored: Tue May 19 14:27:27 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jun 3 12:21:51 2015 +0200

----------------------------------------------------------------------
 .../connectors/twitter/TwitterFilterSource.java | 280 +++++++++++++++++++
 .../twitter/TwitterFilterSourceExample.java     |  68 +++++
 .../connectors/twitter/TwitterSource.java       | 132 ++-------
 .../src/main/resources/twitter.properties       |  19 ++
 4 files changed, 387 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf9cc81a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
new file mode 100644
index 0000000..8dd4458
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.core.endpoint.Location;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
+import com.twitter.hbc.httpclient.auth.Authentication;
+
+/**
+ * 
+ * An extension of {@link TwitterSource} by filter parameters. This extension
+ * enables to filter the twitter stream by user defined parameters.
+ */
+public class TwitterFilterSource extends TwitterSource {
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(TwitterFilterSource.class);
+
+	private static final long serialVersionUID = 1L;
+
+	private List<String> trackTerms = new LinkedList<String>();
+
+	private List<String> languages = new LinkedList<String>();
+
+	private List<Long> followings = new LinkedList<Long>();
+
+	private List<Location> locations = new LinkedList<Location>();
+
+	private Map<String, String> queryParameters = new HashMap<String, String>();
+
+	private Map<String, String> postParameters = new HashMap<String, String>();
+
+	public TwitterFilterSource(String authPath) {
+		super(authPath);
+	}
+
+	@Override
+	protected void initializeConnection() {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initializing Twitter Streaming API connection");
+		}
+		queue = new LinkedBlockingQueue<String>(queueSize);
+
+		StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
+		configEndpoint(endpoint);
+		endpoint.stallWarnings(false);
+
+		Authentication auth = authenticate();
+
+		initializeClient(endpoint, auth);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Twitter Streaming API connection established successfully");
+		}
+	}
+
+	/**
+	 * This function configures the streaming endpoint
+	 * 
+	 * @param endpoint
+	 *            The streaming endpoint
+	 */
+	private void configEndpoint(StatusesFilterEndpoint endpoint) {
+		if (!trackTerms.isEmpty()) {
+			endpoint.trackTerms(trackTerms);
+		}
+		if (!languages.isEmpty()) {
+			endpoint.languages(languages);
+		}
+		if (!followings.isEmpty()) {
+			endpoint.followings(followings);
+		}
+		if (!locations.isEmpty()) {
+			endpoint.locations(locations);
+		}
+		if (!queryParameters.isEmpty()) {
+			for (Entry<String, String> entry : queryParameters.entrySet()) {
+				endpoint.addQueryParameter(entry.getKey(), entry.getValue());
+			}
+		}
+		if (!postParameters.isEmpty()) {
+			for (Entry<String, String> entry : postParameters.entrySet()) {
+				endpoint.addPostParameter(entry.getKey(), entry.getValue());
+			}
+		}
+	}
+
+	/**
+	 * This function sets which term to track.
+	 * 
+	 * @param term
+	 *            The term to track.
+	 */
+	public void trackTerm(String term) {
+		this.trackTerms.add(term);
+	}
+
+	/**
+	 * This function sets which terms to track.
+	 * 
+	 * @param terms
+	 *            The terms to track.
+	 */
+	public void trackTerms(Collection<String> terms) {
+		this.trackTerms.addAll(terms);
+	}
+
+	/**
+	 * This function tells which terms are tracked.
+	 */
+	public List<String> getTrackTerms() {
+		return this.trackTerms;
+	}
+
+	/**
+	 * This function sets which language to filter.
+	 * 
+	 * @param language
+	 *            The language to filter.
+	 */
+	public void filterLanguage(String language) {
+		this.languages.add(language);
+	}
+
+	/**
+	 * This function sets which languages to filter.
+	 * 
+	 * @param languages
+	 *            The languages to filter.
+	 */
+	public void filterLanguages(Collection<String> languages) {
+		this.languages.addAll(languages);
+	}
+
+	/**
+	 * This function tells which languages are filtered.
+	 */
+	public List<String> getLanguages() {
+		return this.languages;
+	}
+
+	/**
+	 * This function sets which user to follow.
+	 * 
+	 * @param userID
+	 *            The ID of the user to follow.
+	 */
+	public void filterFollowings(Long userID) {
+		this.followings.add(userID);
+	}
+
+	/**
+	 * This function sets which users to follow.
+	 * 
+	 * @param userIDs
+	 *            The IDs of the users to follow.
+	 */
+	public void filterFollowings(Collection<Long> userIDs) {
+		this.followings.addAll(userIDs);
+	}
+
+	/**
+	 * This function tells which users are followed.
+	 */
+	public List<Long> getFollowings() {
+		return this.followings;
+	}
+
+	/**
+	 * This function sets which location to filter.
+	 * 
+	 * @param location
+	 *            The location to filter.
+	 */
+	public void filterLocation(Location location) {
+		this.locations.add(location);
+	}
+
+	/**
+	 * This function sets which locations to filter.
+	 * 
+	 * @param locations
+	 *            The locations to filter.
+	 */
+	public void filterLocations(Collection<Location> locations) {
+		this.locations.addAll(locations);
+	}
+
+	/**
+	 * This function tells which locations are filtered.
+	 */
+	public List<Location> getLocations() {
+		return this.locations;
+	}
+
+	/**
+	 * This function sets a query parameter.
+	 * 
+	 * @param parameter
+	 *            The name of the query parameter.
+	 * @param value
+	 *            The value of the query parameter.
+	 */
+	public void addQueryParameter(String parameter, String value) {
+		this.queryParameters.put(parameter, value);
+	}
+
+	/**
+	 * This function sets query parameters.
+	 * 
+	 * @param queryParameters
+	 *            The query parameters for the endpoint.
+	 */
+	public void addQueryParameters(Map<String, String> queryParameters) {
+		this.queryParameters.putAll(queryParameters);
+	}
+
+	/**
+	 * This function tells which query parameters are used by the endpoint.
+	 */
+	public Map<String, String> getQueryParameters() {
+		return this.queryParameters;
+	}
+
+	/**
+	 * This function sets a post parameter.
+	 * 
+	 * @param parameter
+	 *            The name of the post parameter.
+	 * @param value
+	 *            The value of the post parameter.
+	 */
+	public void addPostParameter(String parameter, String value) {
+		this.postParameters.put(parameter, value);
+	}
+
+	/**
+	 * This function sets post parameters.
+	 * 
+	 * @param postParameters
+	 *              The post parameters for the endpoint.
+	 */
+	public void addPostParameters(Map<String, String> postParameters) {
+		this.postParameters.putAll(postParameters);
+	}
+
+	/**
+	 * This function tells which post parameters are used by the endpoint.
+	 */
+	public Map<String, String> postParameters() {
+		return this.postParameters;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf9cc81a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
new file mode 100644
index 0000000..43cb179
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an example how to use TwitterFilterSource. Before executing the
+ * example you have to define the access keys of twitter.properties in the
+ * resource folder. The access keys can be found in your twitter account.
+ */
+public class TwitterFilterSourceExample {
+
+	/**
+	 * path to the twitter properties
+	 */
+	private static final String PATH_TO_AUTH_FILE = "/twitter.properties";
+
+	public static void main(String[] args) {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.getExecutionEnvironment();
+
+		TwitterFilterSource twitterSource = new TwitterFilterSource(
+				TwitterFilterSourceExample.class.getResource(PATH_TO_AUTH_FILE)
+						.getFile());
+
+		twitterSource.trackTerm("obama");
+		twitterSource.filterLanguage("en");
+
+		DataStream<String> streamSource = env.addSource(twitterSource).flatMap(
+				new JSONParseFlatMap<String, String>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void flatMap(String s, Collector<String> c)
+							throws Exception {
+						c.collect(s);
+					}
+				});
+
+		streamSource.print();
+
+		try {
+			env.execute("Twitter Streaming Test");
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bf9cc81a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 78e4aa5..0b47985 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -22,17 +22,16 @@ import java.io.InputStream;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
 import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
 import com.twitter.hbc.core.processor.StringDelimitedProcessor;
 import com.twitter.hbc.httpclient.BasicClient;
@@ -41,26 +40,22 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
 
 /**
  * Implementation of {@link SourceFunction} specialized to emit tweets from
- * Twitter. It can connect to Twitter Streaming API, collect tweets and
+ * Twitter.
  */
-public class TwitterSource extends RichParallelSourceFunction<String> {
+public class TwitterSource extends RichSourceFunction<String> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
 
 	private static final long serialVersionUID = 1L;
 	private String authPath;
-	private transient BlockingQueue<String> queue;
-	private int queueSize = 10000;
+	protected transient BlockingQueue<String> queue;
+	protected int queueSize = 10000;
 	private transient BasicClient client;
 	private int waitSec = 5;
 
 	private int maxNumberOfTweets;
 	private int currentNumberOfTweets;
 
-	private String nextElement = null;
-
-	private volatile boolean isRunning = false;
-
 	/**
 	 * Create {@link TwitterSource} for streaming
 	 * 
@@ -96,7 +91,7 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	/**
 	 * Initialize Hosebird Client to be able to consume Twitter's Streaming API
 	 */
-	private void initializeConnection() {
+	protected void initializeConnection() {
 
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Initializing Twitter Streaming API connection");
@@ -116,10 +111,10 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 		}
 	}
 
-	private OAuth1 authenticate() {
+	protected OAuth1 authenticate() {
 
 		Properties authenticationProperties = loadAuthenticationProperties();
-
+		
 		return new OAuth1(authenticationProperties.getProperty("consumerKey"),
 				authenticationProperties.getProperty("consumerSecret"),
 				authenticationProperties.getProperty("token"),
@@ -132,6 +127,7 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	 * @return the authentication data.
 	 */
 	private Properties loadAuthenticationProperties() {
+		
 		Properties properties = new Properties();
 		try {
 			InputStream input = new FileInputStream(authPath);
@@ -143,82 +139,15 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 		return properties;
 	}
 
-	private void initializeClient(StatusesSampleEndpoint endpoint, Authentication auth) {
+	protected void initializeClient(DefaultStreamingEndpoint endpoint, Authentication auth) {
 
 		client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
 				.endpoint(endpoint).authentication(auth)
 				.processor(new StringDelimitedProcessor(queue)).build();
-
+		
 		client.connect();
 	}
 
-	/**
-	 * Put tweets into output
-	 * 
-	 * @param collector
-	 *            Collector in which the tweets are collected.
-	 */
-	protected void collectFiniteMessages(Collector<String> collector) {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Collecting tweets");
-		}
-
-		for (int i = 0; i < maxNumberOfTweets; i++) {
-			collectOneMessage(collector);
-		}
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Collecting tweets finished");
-		}
-	}
-
-	/**
-	 * Put tweets into output
-	 * 
-	 * @param collector
-	 *            Collector in which the tweets are collected.
-	 */
-	protected void collectMessages(Collector<String> collector) {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Tweet-stream begins");
-		}
-
-		while (isRunning) {
-			collectOneMessage(collector);
-		}
-	}
-
-	/**
-	 * Put one tweet into the output.
-	 * 
-	 * @param collector
-	 *            Collector in which the tweets are collected.
-	 */
-	protected void collectOneMessage(Collector<String> collector) {
-		if (client.isDone()) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
-						.getMessage());
-			}
-		}
-
-		try {
-			String msg = queue.poll(waitSec, TimeUnit.SECONDS);
-			if (msg != null) {
-				collector.collect(msg);
-			} else {
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Did not receive a message in {} seconds", waitSec);
-				}
-			}
-		} catch (InterruptedException e) {
-			throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
-		}
-
-	}
-
 	private void closeConnection() {
 
 		if (LOG.isInfoEnabled()) {
@@ -273,50 +202,29 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 
 	@Override
 	public boolean reachedEnd() throws Exception {
-		if (currentNumberOfTweets >= maxNumberOfTweets) {
-			return false;
-		}
-
-		if (nextElement != null) {
+		if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets){
+			closeConnection();
 			return true;
 		}
+
 		if (client.isDone()) {
 			if (LOG.isErrorEnabled()) {
 				LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
 						.getMessage());
 			}
-			return false;
-		}
-
-		try {
-			String msg = queue.poll(waitSec, TimeUnit.SECONDS);
-			if (msg != null) {
-				nextElement = msg;
-				return true;
-			} else {
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Did not receive a message in {} seconds", waitSec);
-				}
-			}
-		} catch (InterruptedException e) {
-			throw new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+			return true;
 		}
+		
 		return false;
 	}
 
 	@Override
 	public String next() throws Exception {
-		if (nextElement != null) {
-			String result = nextElement;
-			nextElement = null;
-			return result;
-		}
 		if (reachedEnd()) {
 			throw new RuntimeException("Twitter stream end reached.");
-		} else {
-			String result = nextElement;
-			nextElement = null;
-			return result;
 		}
+
+		String msg = queue.take();
+		return msg;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bf9cc81a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
new file mode 100644
index 0000000..1ca4143
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+secret=***
+consumerSecret=***
+token=***-***
+consumerKey=***