You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:03 UTC

[26/51] [abbrv] git commit: [streaming] Twitter connector prototype

[streaming] Twitter connector prototype


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b3cd5fd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b3cd5fd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b3cd5fd0

Branch: refs/heads/master
Commit: b3cd5fd0262dec5277743d1e3f80548bc104dd8e
Parents: ee7c4a8
Author: Eszes Dávid <es...@gmail.com>
Authored: Fri Aug 1 11:54:05 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:11 2014 +0200

----------------------------------------------------------------------
 .../connectors/twitter/TwitterLocal.java        | 106 ++++++++
 .../connectors/twitter/TwitterSource.java       | 243 +++++++++++++++++++
 .../connectors/twitter/TwitterStreaming.java    | 107 ++++++++
 3 files changed, 456 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
new file mode 100644
index 0000000..138fe05
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.streaming.examples.wordcount.WordCountCounter;
+import org.apache.flink.util.Collector;
+
+/**
+ * This program demonstrate the use of TwitterSource. 
+ * Its aim is to count the frequency of the languages of tweets
+ */
+public class TwitterLocal {
+
+	private static final int PARALLELISM = 1;
+	private static final int SOURCE_PARALLELISM = 1;
+
+	/**
+	 * FlatMapFunction to determine the language of tweets if possible 
+	 */
+	public static class SelectLanguageFlatMap extends
+			JSONParseFlatMap<Tuple1<String>, Tuple1<String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Select the language from the incoming JSON text
+		 */
+		@Override
+		public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
+
+			out.collect(new Tuple1<String>(colationOfNull(getField(value.f0, "lang"))));
+		}
+
+		/**
+		 * Change the null String to space character. Useful when null is undesirable.
+		 * @param in
+		 * @return
+		 */
+		protected String colationOfNull(String in) {
+			if (in == null) {
+				return " ";
+			}
+			return in;
+		}
+	}
+
+	public static void main(String[] args) {
+
+		String path = new String();
+
+		if (args != null && args.length == 1) {
+			path = args[0];
+		} else {
+			System.err.println("USAGE:\n haho TwitterLocal itt <pathToPropertiesFile>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		DataStream<Tuple1<String>> streamSource = env.addSource(new TwitterSource(path, 100),
+				SOURCE_PARALLELISM);
+
+
+		DataStream<Tuple2<String, Integer>> dataStream = streamSource
+				.flatMap(new SelectLanguageFlatMap())
+				.partitionBy(0)
+				.map(new WordCountCounter());
+
+		dataStream.addSink(new SinkFunction<Tuple2<String, Integer>>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void invoke(Tuple2<String, Integer> tuple) {
+				System.out.println(tuple);
+
+			}
+		});
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..bbff732
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,243 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from Twitter.
+ * It can connect to Twitter Streaming API, collect tweets and 
+ */
+public class TwitterSource extends SourceFunction<Tuple1<String>> {
+
+	private static final Log LOG = LogFactory.getLog(DataStream.class);
+
+	private static final long serialVersionUID = 1L;
+	private String authPath;
+	private transient BlockingQueue<String> queue;
+	private int queueSize = 10000;
+	private transient BasicClient client;
+	private int waitSec = 5;
+
+	private boolean streaming;
+	private int numberOfTweets;
+
+	/**
+	 * Create {@link TwitterSource} for streaming
+	 * @param authPath
+	 * Location of the properties file containing the required authentication information. 
+	 */
+	public TwitterSource(String authPath) {
+		this.authPath = authPath;
+		streaming = true;
+	}
+
+	/**
+	 * Create {@link TwitterSource} to 
+	 * collect finite number of tweets
+	 * @param authPath
+	 * Location of the properties file containing the required authentication information. 
+	 * @param numberOfTweets
+	 * 
+	 */
+	public TwitterSource(String authPath, int numberOfTweets) {
+		this.authPath = authPath;
+		streaming = false;
+		this.numberOfTweets = numberOfTweets;
+	}
+
+	/**
+	 * 
+	 */
+	@Override
+	public void invoke(Collector<Tuple1<String>> collector) throws Exception {
+
+		initializeConnection();
+
+		
+		if (streaming) {
+			collectMessages(collector);
+		} else {
+			collectMessages(collector, numberOfTweets);
+		}
+
+		closeConnection();
+	}
+
+	/**
+	 * Initialize Hosebird Client to be able to consume Twitter's Streaming API
+	 */
+	private void initializeConnection() {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initializing Twitter Streaming API connection");
+		}
+
+		queue = new LinkedBlockingQueue<String>(queueSize);
+
+		StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+		endpoint.stallWarnings(false);
+
+		Authentication auth = authenticate();
+
+		initializeClient(endpoint, auth);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Twitter Streaming API connection established successfully");
+		}
+	}
+
+	private OAuth1 authenticate() {
+
+		Properties authenticationProperties = loadAuthenticationProperties();
+
+		return new OAuth1(authenticationProperties.getProperty("consumerKey"),
+				authenticationProperties.getProperty("consumerSecret"),
+				authenticationProperties.getProperty("token"),
+				authenticationProperties.getProperty("secret"));
+	}
+
+	/**
+	 * Reads the given properties file for the authentication data.   
+	 * @return
+	 * the authentication data.
+	 */
+	private Properties loadAuthenticationProperties() {
+		Properties properties = new Properties();
+		try {
+			InputStream input = new FileInputStream(authPath);
+			properties.load(input);
+			input.close();
+		} catch (IOException ioe) {
+			new RuntimeException("Cannot open .properties file: " + authPath,
+					ioe);
+		}
+		return properties;
+	}
+
+	private void initializeClient(StatusesSampleEndpoint endpoint,
+			Authentication auth) {
+
+		client = new ClientBuilder().name("twitterSourceClient")
+				.hosts(Constants.STREAM_HOST).endpoint(endpoint)
+				.authentication(auth)
+				.processor(new StringDelimitedProcessor(queue)).build();
+
+		client.connect();
+	}
+
+	/**
+	 * Put tweets into collector
+	 * @param collector
+	 * @param piece
+	 */
+	protected void collectMessages(Collector<Tuple1<String>> collector, int piece) {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Collecting tweets");
+		}
+
+		for (int i = 0; i < piece; i++) {
+			collectOneMessage(collector);
+		}
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Collecting tweets finished");
+		}
+	}
+
+	/**
+	 * Put tweets into collector
+	 * @param collector
+	 * 
+	 */
+	protected void collectMessages(Collector<Tuple1<String>> collector) {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Tweet-stream begins");
+		}
+
+		while (true) {
+			collectOneMessage(collector);
+		}
+	}
+
+	/**
+	 * Put one tweet into the collector.
+	 * @param collector
+	 */
+	protected void collectOneMessage(Collector<Tuple1<String>> collector) {
+		if (client.isDone()) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Client connection closed unexpectedly: "
+						+ client.getExitEvent().getMessage());
+			}
+		}
+
+		try {
+			String msg = queue.poll(waitSec, TimeUnit.SECONDS);
+			if (msg != null) {
+				collector.collect(new Tuple1<String>(msg));
+			} else {
+				if (LOG.isInfoEnabled()) {
+					LOG.info("Did not receive a message in " + waitSec
+							+ " seconds");
+				}
+			}
+		} catch (InterruptedException e) {
+			new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+		}
+
+	}
+
+	private void closeConnection() {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initiating connection close");
+		}
+
+		client.stop();
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Connection closed successfully");
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
new file mode 100644
index 0000000..805bf06
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+
+public class TwitterStreaming {
+
+	private static final int PARALLELISM = 1;
+	private static final int SOURCE_PARALLELISM = 1;
+
+	public static class TwitterSink extends SinkFunction<Tuple5<Long, Long, String, String, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple5<Long, Long, String, String, String> tuple) {
+			System.out.println(tuple.f0 + " " + tuple.f1 + " " + tuple.f4);
+			System.out.println("NAME: " + tuple.f2);
+			System.out.println(tuple.f3);
+			System.out.println(" ");
+		}
+
+	}
+	
+	public static class SelectDataFlatMap extends
+			JSONParseFlatMap<Tuple1<String>, Tuple5<Long, Long, String, String, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Tuple1<String> value,
+				Collector<Tuple5<Long, Long, String, String, String>> out)
+				throws Exception {
+
+			out.collect(new Tuple5<Long, Long, String, String, String>(
+					convertDateString2Long(getField(value.f0, "id")),
+					convertDateString2LongDate(getField(value.f0, "created_at")),
+					colationOfNull(getField(value.f0, "user.name")),
+					colationOfNull(getField(value.f0, "text")),
+					getField(value.f0, "lang")));
+		}
+		
+		protected String colationOfNull(String in){
+			if(in==null){
+				return " ";
+			}
+			return in;
+		}
+		
+		protected Long convertDateString2LongDate(String dateString) {
+			if (dateString!=(null)) {
+				String[] dateArray = dateString.split(" ");
+				return Long.parseLong(dateArray[2])*100000+Long.parseLong(dateArray[5]);
+			}
+			return 0L;
+		}
+		
+		protected Long convertDateString2Long(String dateString) {
+			if (dateString != null) {
+				return Long.parseLong(dateString);
+			}
+			return 0L;
+		}
+	}
+
+	public static void main(String[] args) {
+		
+		String path = "/home/eszes/git/auth.properties";
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		DataStream<Tuple1<String>> streamSource = env.addSource(
+				new TwitterSource(path,100), SOURCE_PARALLELISM);
+
+		DataStream<Tuple5<Long, Long, String, String, String>> selectedDataStream = streamSource
+				.flatMap(new SelectDataFlatMap());
+
+		selectedDataStream.addSink(new TwitterSink());
+
+		env.execute();
+	}
+}