You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/05/24 16:43:29 UTC
[1/2] bahir git commit: [MINOR] Update Spark dependency to release
2.1.1
Repository: bahir
Updated Branches:
refs/heads/master fd4c35fc9 -> 86ded930e
[MINOR] Update Spark dependency to release 2.1.1
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/68ed2d44
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/68ed2d44
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/68ed2d44
Branch: refs/heads/master
Commit: 68ed2d448c91ca2a79697649d8bae7b94c3a05bc
Parents: fd4c35f
Author: Luciano Resende <lr...@apache.org>
Authored: Thu May 18 00:42:17 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu May 18 00:42:17 2017 -0400
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/68ed2d44/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 65129cd..f76aac5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@
<log4j.version>1.2.17</log4j.version>
<!-- Spark version -->
- <spark.version>2.1.0</spark.version>
+ <spark.version>2.1.1</spark.version>
<!-- Streaming Akka connector -->
<akka.group>com.typesafe.akka</akka.group>
[2/2] bahir git commit: [BAHIR-117] Expand filtering options for
TwitterInputDStream
Posted by lr...@apache.org.
[BAHIR-117] Expand filtering options for TwitterInputDStream
Adds a new method to TwitterUtils that enables users to pass
an arbitrary FilterQuery down to the TwitterReceiver.
This enables use-cases like receiving Tweets based on location,
based on handle, etc. Previously users were only able to receive
Tweets based on disjunctive keyword queries.
Closes #43.
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/86ded930
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/86ded930
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/86ded930
Branch: refs/heads/master
Commit: 86ded930e4af769e8191c8f415fe48193dd4914b
Parents: 68ed2d4
Author: Clemens Wolff <cl...@microsoft.com>
Authored: Thu May 4 12:52:54 2017 -0700
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed May 24 09:42:53 2017 -0700
----------------------------------------------------------------------
.../streaming/akka/JavaActorWordCount.java | 2 +-
.../streaming/twitter/TwitterLocations.scala | 92 ++++++++++++++++++++
.../streaming/twitter/TwitterInputDStream.scala | 12 ++-
.../spark/streaming/twitter/TwitterUtils.scala | 46 +++++++++-
.../twitter/JavaTwitterStreamSuite.java | 4 +
.../streaming/twitter/TwitterStreamSuite.scala | 5 +-
6 files changed, 150 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
----------------------------------------------------------------------
diff --git a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
index 740f9f8..abc1f70 100644
--- a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
+++ b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
@@ -49,7 +49,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
private final String urlOfPublisher;
- public JavaSampleActorReceiver(String urlOfPublisher) {
+ JavaSampleActorReceiver(String urlOfPublisher) {
this.urlOfPublisher = urlOfPublisher;
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala b/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala
new file mode 100644
index 0000000..00859fe
--- /dev/null
+++ b/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming.twitter
+
+import org.apache.log4j.{Level, Logger}
+import twitter4j.FilterQuery
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.twitter._
+
+/**
+ * Illustrates the use of custom filter queries to get Tweets from one or more locations.
+ */
+object TwitterLocations {
+ def main(args: Array[String]) {
+ if (args.length < 4 || args.length % 4 != 0) {
+ System.err.println("Usage: TwitterLocations <consumer key> <consumer secret> " +
+ "<access token> <access token secret> " +
+ "[<latitude-south-west> <longitude-south-west>" +
+ " <latitude-north-east> <longitude-north-east> ...]")
+ System.exit(1)
+ }
+
+ // Set logging level if log4j not configured (override by adding log4j.properties to classpath)
+ if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
+ Logger.getRootLogger.setLevel(Level.WARN)
+ }
+
+ // Set the system properties so that Twitter4j library used by twitter stream
+ // can use them to generate OAuth credentials
+ val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
+ System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
+ System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
+ System.setProperty("twitter4j.oauth.accessToken", accessToken)
+ System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
+
+ // Get bounding boxes of locations for which to retrieve Tweets from command line
+ val locationArgs = args.takeRight(args.length - 4)
+ val boundingBoxes = if (locationArgs.length == 0) {
+ System.out.println("No location bounding boxes specified, using defaults for New York City")
+ val nycSouthWest = Array(-74.0, 40.0)
+ val nycNorthEast = Array(-73.0, 41.0)
+ Array(nycSouthWest, nycNorthEast)
+ } else {
+ locationArgs.map(_.toDouble).sliding(2, 2).toArray
+ }
+
+ val sparkConf = new SparkConf().setAppName("TwitterLocations")
+
+ // check Spark configuration for master URL, set it to local if not configured
+ if (!sparkConf.contains("spark.master")) {
+ sparkConf.setMaster("local[2]")
+ }
+
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+ val locationsQuery = new FilterQuery().locations(boundingBoxes : _*)
+
+ // Print Tweets from the specified coordinates
+ // This includes Tweets geo-tagged in the bounding box defined by the coordinates
+ // As well as Tweets tagged in places inside of the bounding box
+ TwitterUtils.createFilteredStream(ssc, None, Some(locationsQuery))
+ .map(tweet => {
+ val latitude = Option(tweet.getGeoLocation).map(l => s"${l.getLatitude},${l.getLongitude}")
+ val place = Option(tweet.getPlace).map(_.getName)
+ val location = latitude.getOrElse(place.getOrElse("(no location)"))
+ val text = tweet.getText.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')
+ s"$location\t$text"
+ })
+ .print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index bd23a12..81ce60d 100644
--- a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -41,7 +41,7 @@ private[streaming]
class TwitterInputDStream(
_ssc: StreamingContext,
twitterAuth: Option[Authorization],
- filters: Seq[String],
+ query: Option[FilterQuery],
storageLevel: StorageLevel
) extends ReceiverInputDStream[Status](_ssc) {
@@ -52,14 +52,14 @@ class TwitterInputDStream(
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
override def getReceiver(): Receiver[Status] = {
- new TwitterReceiver(authorization, filters, storageLevel)
+ new TwitterReceiver(authorization, query, storageLevel)
}
}
private[streaming]
class TwitterReceiver(
twitterAuth: Authorization,
- filters: Seq[String],
+ query: Option[FilterQuery],
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
@@ -85,10 +85,8 @@ class TwitterReceiver(
}
})
- val query = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.mkString(","))
- newTwitterStream.filter(query)
+ if (query.isDefined) {
+ newTwitterStream.filter(query.get)
} else {
newTwitterStream.sample()
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 9cb0106..b0e9b78 100644
--- a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.twitter
-import twitter4j.Status
+import twitter4j.{FilterQuery, Status}
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
@@ -33,6 +33,25 @@ object TwitterUtils {
* authorization; this uses the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret
+ * @param query A query to get only those tweets that match it
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createFilteredStream(
+ ssc: StreamingContext,
+ twitterAuth: Option[Authorization],
+ query: Option[FilterQuery] = None,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): ReceiverInputDStream[Status] = {
+ new TwitterInputDStream(ssc, twitterAuth, query, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param ssc StreamingContext object
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret
* @param filters Set of filter strings to get only those tweets that match them
* @param storageLevel Storage level to use for storing the received objects
*/
@@ -42,7 +61,11 @@ object TwitterUtils {
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[Status] = {
- new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+ val query = if (filters.nonEmpty) {
+ Some(new FilterQuery().track(filters.mkString(",")))
+ } else None
+
+ createFilteredStream(ssc, twitterAuth, query, storageLevel)
}
/**
@@ -129,4 +152,23 @@ object TwitterUtils {
): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param jssc JavaStreamingContext object
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+ * twitter4j.oauth.accessTokenSecret
+ * @param query A query to get only those tweets that match it
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def createFilteredStream(
+ jssc: JavaStreamingContext,
+ twitterAuth: Authorization,
+ query: FilterQuery,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[Status] = {
+ createFilteredStream(jssc.ssc, Some(twitterAuth), Some(query), storageLevel)
+ }
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index 26ec8af..e22e24e 100644
--- a/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.twitter;
import org.junit.Test;
+import twitter4j.FilterQuery;
import twitter4j.Status;
import twitter4j.auth.Authorization;
import twitter4j.auth.NullAuthorization;
@@ -30,6 +31,7 @@ public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
public void testTwitterStream() {
String[] filters = { "filter1", "filter2" };
Authorization auth = NullAuthorization.getInstance();
+ FilterQuery query = new FilterQuery().language("en,es");
// tests the API, does not actually test data receiving
JavaDStream<Status> test1 = TwitterUtils.createStream(ssc);
@@ -40,5 +42,7 @@ public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters);
JavaDStream<Status> test6 = TwitterUtils.createStream(ssc,
auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test7 = TwitterUtils.createFilteredStream(ssc,
+ auth, query, StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index bd23831..3f1babd 100644
--- a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.twitter
import org.scalatest.BeforeAndAfter
-import twitter4j.Status
+import twitter4j.{FilterQuery, Status}
import twitter4j.auth.{Authorization, NullAuthorization}
import org.apache.spark.SparkFunSuite
@@ -38,6 +38,7 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging
test("twitter input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val filters = Seq("filter1", "filter2")
+ val query = new FilterQuery().language("fr,es")
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
@@ -52,6 +53,8 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging
TwitterUtils.createStream(ssc, Some(authorization), filters)
val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test7: ReceiverInputDStream[Status] = TwitterUtils.createFilteredStream(
+ ssc, Some(authorization), Some(query), StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream