You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/11/19 13:43:59 UTC

[3/3] flink git commit: FLINK-3041: Twitter Streaming Description section of Streaming Programming guide refers to an incorrect example 'TwitterLocal'

FLINK-3041: Twitter Streaming Description section of Streaming Programming guide refers to an incorrect example 'TwitterLocal'

This closes #1379.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be27a188
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be27a188
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be27a188

Branch: refs/heads/release-0.10
Commit: be27a188fb2e4979bffe7b4b624e7f9243cebba5
Parents: df3347b
Author: smarthi <sm...@apache.org>
Authored: Wed Nov 18 12:48:38 2015 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 19 13:43:27 2015 +0100

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                       |  6 +++---
 .../connectors/twitter/TwitterFilterSource.java    | 17 ++++++++---------
 .../connectors/twitter/TwitterSource.java          |  4 ++--
 .../connectors/twitter/TwitterStreaming.java       |  4 ++--
 .../connectors/twitter/TwitterTopology.java        |  4 ++--
 .../streaming/connectors/json/JSONParserTest.java  |  1 -
 .../streaming/connectors/json/JSONParserTest2.java |  1 -
 .../examples/iteration/IterateExample.java         | 14 ++++++--------
 .../streaming/examples/twitter/TwitterStream.java  |  4 ++--
 .../GroupedProcessingTimeWindowExample.java        |  2 +-
 .../examples/windowing/TopSpeedWindowing.java      |  8 ++++----
 11 files changed, 30 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 3bb2ee8..a5b8b5b 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -3929,7 +3929,7 @@ In order to connect to Twitter stream the user has to register their program and
 
 #### Acquiring the authentication information
 First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions.
-After selecting the application, the API key and API secret (called `consumerKey` and `sonsumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary access token data (`token` and `secret`) can be acquired here.
+After selecting the application, the API key and API secret (called `consumerKey` and `consumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary OAuth Access Token data (`token` and `secret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab.
 Remember to keep these pieces of information secret and do not push them to public repositories.
 
 #### Accessing the authentication information
@@ -3947,7 +3947,7 @@ consumerKey=***
 The `TwitterSource` class has two constructors.
 
 1. `public TwitterSource(String authPath, int numberOfTweets);`
-to emit finite number of tweets
+to emit a finite number of tweets
 2. `public TwitterSource(String authPath);`
 for streaming
 
@@ -3990,7 +3990,7 @@ function which can be use to acquire the value of a given field.
 There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information.
 
 #### Example
-`TwitterLocal` is an example how to use `TwitterSource`. It implements a language frequency counter program.
+`TwitterStream` is an example of how to use `TwitterSource`. It implements a language frequency counter program.
 
 [Back to top](#top)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
index 8dd4458..2894322 100644
--- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
@@ -39,22 +39,21 @@ import com.twitter.hbc.httpclient.auth.Authentication;
  */
 public class TwitterFilterSource extends TwitterSource {
 
-	private static final Logger LOG = LoggerFactory
-			.getLogger(TwitterFilterSource.class);
+	private static final Logger LOG = LoggerFactory.getLogger(TwitterFilterSource.class);
 
 	private static final long serialVersionUID = 1L;
 
-	private List<String> trackTerms = new LinkedList<String>();
+	private List<String> trackTerms = new LinkedList<>();
 
-	private List<String> languages = new LinkedList<String>();
+	private List<String> languages = new LinkedList<>();
 
-	private List<Long> followings = new LinkedList<Long>();
+	private List<Long> followings = new LinkedList<>();
 
-	private List<Location> locations = new LinkedList<Location>();
+	private List<Location> locations = new LinkedList<>();
 
-	private Map<String, String> queryParameters = new HashMap<String, String>();
+	private Map<String, String> queryParameters = new HashMap<>();
 
-	private Map<String, String> postParameters = new HashMap<String, String>();
+	private Map<String, String> postParameters = new HashMap<>();
 
 	public TwitterFilterSource(String authPath) {
 		super(authPath);
@@ -66,7 +65,7 @@ public class TwitterFilterSource extends TwitterSource {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Initializing Twitter Streaming API connection");
 		}
-		queue = new LinkedBlockingQueue<String>(queueSize);
+		queue = new LinkedBlockingQueue<>(queueSize);
 
 		StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
 		configEndpoint(endpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index bad0f8c..d99af82 100644
--- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -77,7 +77,7 @@ public class TwitterSource extends RichSourceFunction<String> {
 	 * @param authPath
 	 *            Location of the properties file containing the required
 	 *            authentication information.
-	 * @param numberOfTweets
+	 * @param numberOfTweets max number of tweets
 	 * 
 	 */
 	public TwitterSource(String authPath, int numberOfTweets) {
@@ -101,7 +101,7 @@ public class TwitterSource extends RichSourceFunction<String> {
 			LOG.info("Initializing Twitter Streaming API connection");
 		}
 
-		queue = new LinkedBlockingQueue<String>(queueSize);
+		queue = new LinkedBlockingQueue<>(queueSize);
 
 		StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
 		endpoint.stallWarnings(false);

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index a80c32a..3e8ce1b 100644
--- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -58,7 +58,7 @@ public class TwitterStreaming {
 		public void flatMap(String value, Collector<Tuple5<Long, Integer, String, String, String>> out)
 				throws Exception {
 			try {
-				out.collect(new Tuple5<Long, Integer, String, String, String>(
+				out.collect(new Tuple5<>(
 						getLong(value, "id"),
 						getInt(value, "entities.hashtags[0].indices[1]"),
 						getString(value, "lang"),
@@ -74,7 +74,7 @@ public class TwitterStreaming {
 
 	public static void main(String[] args) throws Exception {
 
-		String path = new String();
+		String path;
 
 		if (args != null && args.length == 1) {
 			path = args[0];

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
index b1fc92c..d88184c 100644
--- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -58,7 +58,7 @@ public class TwitterTopology {
 
 	public static void main(String[] args) throws Exception {
 
-		String path = new String();
+		String path;
 
 		if (args != null && args.length == 1) {
 			path = args[0];
@@ -79,7 +79,7 @@ public class TwitterTopology {
 
 					@Override
 					public Tuple2<String, Integer> map(String value) throws Exception {
-						return new Tuple2<String, Integer>(value, 1);
+						return new Tuple2<>(value, 1);
 					}
 				})
 				.keyBy(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
index b1d4115..33108c9 100644
--- a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.Collection;
 
-import org.apache.flink.streaming.connectors.json.JSONParser;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.commons.json.JSONObject;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
index 8851086..eb796b4 100644
--- a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.streaming.connectors.json.JSONParser;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.commons.json.JSONObject;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index 52ec896..b6e1a61 100644
--- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -121,7 +121,7 @@ public class IterateExample {
 				int first = rnd.nextInt(BOUND / 2 - 1) + 1;
 				int second = rnd.nextInt(BOUND / 2 - 1) + 1;
 
-				ctx.collect(new Tuple2<Integer, Integer>(first, second));
+				ctx.collect(new Tuple2<>(first, second));
 				counter++;
 				Thread.sleep(50L);
 			}
@@ -143,7 +143,7 @@ public class IterateExample {
 		public Tuple2<Integer, Integer> map(String value) throws Exception {
 			String record = value.substring(1, value.length() - 1);
 			String[] splitted = record.split(",");
-			return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
+			return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
 		}
 	}
 
@@ -158,7 +158,7 @@ public class IterateExample {
 		@Override
 		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
 				Exception {
-			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f0, value.f1, 0);
+			return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0);
 		}
 	}
 
@@ -173,8 +173,7 @@ public class IterateExample {
 		@Override
 		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
 				Integer> value) throws Exception {
-			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 +
-					value.f3, ++value.f4);
+			return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
 		}
 	}
 
@@ -186,7 +185,7 @@ public class IterateExample {
 
 		@Override
 		public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
-			List<String> output = new ArrayList<String>();
+			List<String> output = new ArrayList<>();
 			if (value.f2 < BOUND && value.f3 < BOUND) {
 				output.add("iterate");
 			} else {
@@ -207,8 +206,7 @@ public class IterateExample {
 		public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer>
 				value) throws
 				Exception {
-			return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1),
-					value.f4);
+			return new Tuple2<>(new Tuple2<>(value.f0, value.f1), value.f4);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 06872f0..d26dc42 100644
--- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -110,8 +110,8 @@ public class TwitterStream {
 					while (tokenizer.hasMoreTokens()) {
 						String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
 
-						if (result != null && !result.equals("")) {
-							out.collect(new Tuple2<String, Integer>(result, 1));
+						if (!result.equals("")) {
+							out.collect(new Tuple2<>(result, 1));
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 982b73d..f08069b 100644
--- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -59,7 +59,7 @@ public class GroupedProcessingTimeWindowExample {
 						
 						while (running && count < numElements) {
 							count++;
-							ctx.collect(new Tuple2<Long, Long>(val++, 1L));
+							ctx.collect(new Tuple2<>(val++, 1L));
 							
 							if (val > numKeys) {
 								val = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/be27a188/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index df3402e..30eda67 100644
--- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -64,9 +64,12 @@ public class TopSpeedWindowing {
 		if (fileInput) {
 			carData = env.readTextFile(inputPath).map(new ParseCarData());
 		} else {
+			int numOfCars = 2;
 			carData = env.addSource(CarSource.create(numOfCars));
 		}
 
+		int evictionSec = 10;
+		double triggerMeters = 50;
 		DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
 				.assignTimestamps(new CarTimestamp())
 				.keyBy(0)
@@ -133,7 +136,7 @@ public class TopSpeedWindowing {
 						speeds[carId] = Math.max(0, speeds[carId] - 5);
 					}
 					distances[carId] += speeds[carId] / 3.6d;
-					Tuple4<Integer, Integer, Double, Long> record = new Tuple4<Integer, Integer, Double, Long>(carId,
+					Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(carId,
 							speeds[carId], distances[carId], System.currentTimeMillis());
 					ctx.collect(record);
 					counter++;
@@ -186,9 +189,6 @@ public class TopSpeedWindowing {
 
 	private static boolean fileInput = false;
 	private static boolean fileOutput = false;
-	private static int numOfCars = 2;
-	private static int evictionSec = 10;
-	private static double triggerMeters = 50;
 	private static String inputPath;
 	private static String outputPath;