You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:03 UTC
[26/51] [abbrv] git commit: [streaming] Twitter connector prototype
[streaming] Twitter connector prototype
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b3cd5fd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b3cd5fd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b3cd5fd0
Branch: refs/heads/master
Commit: b3cd5fd0262dec5277743d1e3f80548bc104dd8e
Parents: ee7c4a8
Author: Eszes Dávid <es...@gmail.com>
Authored: Fri Aug 1 11:54:05 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:11 2014 +0200
----------------------------------------------------------------------
.../connectors/twitter/TwitterLocal.java | 106 ++++++++
.../connectors/twitter/TwitterSource.java | 243 +++++++++++++++++++
.../connectors/twitter/TwitterStreaming.java | 107 ++++++++
3 files changed, 456 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
new file mode 100644
index 0000000..138fe05
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -0,0 +1,106 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.streaming.examples.wordcount.WordCountCounter;
+import org.apache.flink.util.Collector;
+
+/**
+ * This program demonstrate the use of TwitterSource.
+ * Its aim is to count the frequency of the languages of tweets
+ */
+public class TwitterLocal {
+
+ private static final int PARALLELISM = 1;
+ private static final int SOURCE_PARALLELISM = 1;
+
+ /**
+ * FlatMapFunction to determine the language of tweets if possible
+ */
+ public static class SelectLanguageFlatMap extends
+ JSONParseFlatMap<Tuple1<String>, Tuple1<String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Select the language from the incoming JSON text
+ */
+ @Override
+ public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
+
+ out.collect(new Tuple1<String>(colationOfNull(getField(value.f0, "lang"))));
+ }
+
+ /**
+ * Change the null String to space character. Useful when null is undesirable.
+ * @param in
+ * @return
+ */
+ protected String colationOfNull(String in) {
+ if (in == null) {
+ return " ";
+ }
+ return in;
+ }
+ }
+
+ public static void main(String[] args) {
+
+ String path = new String();
+
+ if (args != null && args.length == 1) {
+ path = args[0];
+ } else {
+ System.err.println("USAGE:\n haho TwitterLocal itt <pathToPropertiesFile>");
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(PARALLELISM);
+
+ DataStream<Tuple1<String>> streamSource = env.addSource(new TwitterSource(path, 100),
+ SOURCE_PARALLELISM);
+
+
+ DataStream<Tuple2<String, Integer>> dataStream = streamSource
+ .flatMap(new SelectLanguageFlatMap())
+ .partitionBy(0)
+ .map(new WordCountCounter());
+
+ dataStream.addSink(new SinkFunction<Tuple2<String, Integer>>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple2<String, Integer> tuple) {
+ System.out.println(tuple);
+
+ }
+ });
+
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..bbff732
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,243 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from Twitter.
+ * It can connect to Twitter Streaming API, collect tweets and
+ */
+public class TwitterSource extends SourceFunction<Tuple1<String>> {
+
+ private static final Log LOG = LogFactory.getLog(DataStream.class);
+
+ private static final long serialVersionUID = 1L;
+ private String authPath;
+ private transient BlockingQueue<String> queue;
+ private int queueSize = 10000;
+ private transient BasicClient client;
+ private int waitSec = 5;
+
+ private boolean streaming;
+ private int numberOfTweets;
+
+ /**
+ * Create {@link TwitterSource} for streaming
+ * @param authPath
+ * Location of the properties file containing the required authentication information.
+ */
+ public TwitterSource(String authPath) {
+ this.authPath = authPath;
+ streaming = true;
+ }
+
+ /**
+ * Create {@link TwitterSource} to
+ * collect finite number of tweets
+ * @param authPath
+ * Location of the properties file containing the required authentication information.
+ * @param numberOfTweets
+ *
+ */
+ public TwitterSource(String authPath, int numberOfTweets) {
+ this.authPath = authPath;
+ streaming = false;
+ this.numberOfTweets = numberOfTweets;
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void invoke(Collector<Tuple1<String>> collector) throws Exception {
+
+ initializeConnection();
+
+
+ if (streaming) {
+ collectMessages(collector);
+ } else {
+ collectMessages(collector, numberOfTweets);
+ }
+
+ closeConnection();
+ }
+
+ /**
+ * Initialize Hosebird Client to be able to consume Twitter's Streaming API
+ */
+ private void initializeConnection() {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Initializing Twitter Streaming API connection");
+ }
+
+ queue = new LinkedBlockingQueue<String>(queueSize);
+
+ StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+ endpoint.stallWarnings(false);
+
+ Authentication auth = authenticate();
+
+ initializeClient(endpoint, auth);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Twitter Streaming API connection established successfully");
+ }
+ }
+
+ private OAuth1 authenticate() {
+
+ Properties authenticationProperties = loadAuthenticationProperties();
+
+ return new OAuth1(authenticationProperties.getProperty("consumerKey"),
+ authenticationProperties.getProperty("consumerSecret"),
+ authenticationProperties.getProperty("token"),
+ authenticationProperties.getProperty("secret"));
+ }
+
+ /**
+ * Reads the given properties file for the authentication data.
+ * @return
+ * the authentication data.
+ */
+ private Properties loadAuthenticationProperties() {
+ Properties properties = new Properties();
+ try {
+ InputStream input = new FileInputStream(authPath);
+ properties.load(input);
+ input.close();
+ } catch (IOException ioe) {
+ new RuntimeException("Cannot open .properties file: " + authPath,
+ ioe);
+ }
+ return properties;
+ }
+
+ private void initializeClient(StatusesSampleEndpoint endpoint,
+ Authentication auth) {
+
+ client = new ClientBuilder().name("twitterSourceClient")
+ .hosts(Constants.STREAM_HOST).endpoint(endpoint)
+ .authentication(auth)
+ .processor(new StringDelimitedProcessor(queue)).build();
+
+ client.connect();
+ }
+
+ /**
+ * Put tweets into collector
+ * @param collector
+ * @param piece
+ */
+ protected void collectMessages(Collector<Tuple1<String>> collector, int piece) {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Collecting tweets");
+ }
+
+ for (int i = 0; i < piece; i++) {
+ collectOneMessage(collector);
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Collecting tweets finished");
+ }
+ }
+
+ /**
+ * Put tweets into collector
+ * @param collector
+ *
+ */
+ protected void collectMessages(Collector<Tuple1<String>> collector) {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Tweet-stream begins");
+ }
+
+ while (true) {
+ collectOneMessage(collector);
+ }
+ }
+
+ /**
+ * Put one tweet into the collector.
+ * @param collector
+ */
+ protected void collectOneMessage(Collector<Tuple1<String>> collector) {
+ if (client.isDone()) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Client connection closed unexpectedly: "
+ + client.getExitEvent().getMessage());
+ }
+ }
+
+ try {
+ String msg = queue.poll(waitSec, TimeUnit.SECONDS);
+ if (msg != null) {
+ collector.collect(new Tuple1<String>(msg));
+ } else {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Did not receive a message in " + waitSec
+ + " seconds");
+ }
+ }
+ } catch (InterruptedException e) {
+ new RuntimeException("'Waiting for tweet' thread is interrupted", e);
+ }
+
+ }
+
+ private void closeConnection() {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Initiating connection close");
+ }
+
+ client.stop();
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Connection closed successfully");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b3cd5fd0/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
new file mode 100644
index 0000000..805bf06
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+
+public class TwitterStreaming {
+
+ private static final int PARALLELISM = 1;
+ private static final int SOURCE_PARALLELISM = 1;
+
+ public static class TwitterSink extends SinkFunction<Tuple5<Long, Long, String, String, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple5<Long, Long, String, String, String> tuple) {
+ System.out.println(tuple.f0 + " " + tuple.f1 + " " + tuple.f4);
+ System.out.println("NAME: " + tuple.f2);
+ System.out.println(tuple.f3);
+ System.out.println(" ");
+ }
+
+ }
+
+ public static class SelectDataFlatMap extends
+ JSONParseFlatMap<Tuple1<String>, Tuple5<Long, Long, String, String, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Tuple1<String> value,
+ Collector<Tuple5<Long, Long, String, String, String>> out)
+ throws Exception {
+
+ out.collect(new Tuple5<Long, Long, String, String, String>(
+ convertDateString2Long(getField(value.f0, "id")),
+ convertDateString2LongDate(getField(value.f0, "created_at")),
+ colationOfNull(getField(value.f0, "user.name")),
+ colationOfNull(getField(value.f0, "text")),
+ getField(value.f0, "lang")));
+ }
+
+ protected String colationOfNull(String in){
+ if(in==null){
+ return " ";
+ }
+ return in;
+ }
+
+ protected Long convertDateString2LongDate(String dateString) {
+ if (dateString!=(null)) {
+ String[] dateArray = dateString.split(" ");
+ return Long.parseLong(dateArray[2])*100000+Long.parseLong(dateArray[5]);
+ }
+ return 0L;
+ }
+
+ protected Long convertDateString2Long(String dateString) {
+ if (dateString != null) {
+ return Long.parseLong(dateString);
+ }
+ return 0L;
+ }
+ }
+
+ public static void main(String[] args) {
+
+ String path = "/home/eszes/git/auth.properties";
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(PARALLELISM);
+
+ DataStream<Tuple1<String>> streamSource = env.addSource(
+ new TwitterSource(path,100), SOURCE_PARALLELISM);
+
+ DataStream<Tuple5<Long, Long, String, String, String>> selectedDataStream = streamSource
+ .flatMap(new SelectDataFlatMap());
+
+ selectedDataStream.addSink(new TwitterSink());
+
+ env.execute();
+ }
+}