You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/05/24 16:43:29 UTC

[1/2] bahir git commit: [MINOR] Update Spark dependency to release 2.1.1

Repository: bahir
Updated Branches:
  refs/heads/master fd4c35fc9 -> 86ded930e


[MINOR] Update Spark dependency to release 2.1.1


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/68ed2d44
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/68ed2d44
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/68ed2d44

Branch: refs/heads/master
Commit: 68ed2d448c91ca2a79697649d8bae7b94c3a05bc
Parents: fd4c35f
Author: Luciano Resende <lr...@apache.org>
Authored: Thu May 18 00:42:17 2017 -0400
Committer: Luciano Resende <lr...@apache.org>
Committed: Thu May 18 00:42:17 2017 -0400

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/68ed2d44/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 65129cd..f76aac5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,7 +98,7 @@
     <log4j.version>1.2.17</log4j.version>
 
     <!-- Spark version -->
-    <spark.version>2.1.0</spark.version>
+    <spark.version>2.1.1</spark.version>
 
     <!-- Streaming Akka connector -->
     <akka.group>com.typesafe.akka</akka.group>


[2/2] bahir git commit: [BAHIR-117] Expand filtering options for TwitterInputDStream

Posted by lr...@apache.org.
[BAHIR-117] Expand filtering options for TwitterInputDStream

Adds a new method to TwitterUtils that enables users to pass
an arbitrary FilterQuery down to the TwitterReceiver.

This enables use-cases like receiving Tweets based on location,
based on handle, etc. Previously users were only able to receive
Tweets based on disjunctive keyword queries.

Closes #43.


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/86ded930
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/86ded930
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/86ded930

Branch: refs/heads/master
Commit: 86ded930e4af769e8191c8f415fe48193dd4914b
Parents: 68ed2d4
Author: Clemens Wolff <cl...@microsoft.com>
Authored: Thu May 4 12:52:54 2017 -0700
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed May 24 09:42:53 2017 -0700

----------------------------------------------------------------------
 .../streaming/akka/JavaActorWordCount.java      |  2 +-
 .../streaming/twitter/TwitterLocations.scala    | 92 ++++++++++++++++++++
 .../streaming/twitter/TwitterInputDStream.scala | 12 ++-
 .../spark/streaming/twitter/TwitterUtils.scala  | 46 +++++++++-
 .../twitter/JavaTwitterStreamSuite.java         |  4 +
 .../streaming/twitter/TwitterStreamSuite.scala  |  5 +-
 6 files changed, 150 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
----------------------------------------------------------------------
diff --git a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
index 740f9f8..abc1f70 100644
--- a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
+++ b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java
@@ -49,7 +49,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
 
   private final String urlOfPublisher;
 
-  public JavaSampleActorReceiver(String urlOfPublisher) {
+  JavaSampleActorReceiver(String urlOfPublisher) {
     this.urlOfPublisher = urlOfPublisher;
   }
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala b/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala
new file mode 100644
index 0000000..00859fe
--- /dev/null
+++ b/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming.twitter
+
+import org.apache.log4j.{Level, Logger}
+import twitter4j.FilterQuery
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.twitter._
+
+/**
+ * Illustrates the use of custom filter queries to get Tweets from one or more locations.
+ */
+object TwitterLocations {
+  def main(args: Array[String]) {
+    if (args.length < 4 || args.length % 4 != 0) {
+      System.err.println("Usage: TwitterLocations <consumer key> <consumer secret> " +
+        "<access token> <access token secret> " +
+        "[<latitude-south-west> <longitude-south-west>" +
+        " <latitude-north-east> <longitude-north-east> ...]")
+      System.exit(1)
+    }
+
+    // Set logging level if log4j not configured (override by adding log4j.properties to classpath)
+    if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
+      Logger.getRootLogger.setLevel(Level.WARN)
+    }
+
+    // Set the system properties so that Twitter4j library used by twitter stream
+    // can use them to generate OAuth credentials
+    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
+    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
+    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
+    System.setProperty("twitter4j.oauth.accessToken", accessToken)
+    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
+
+    // Get bounding boxes of locations for which to retrieve Tweets from command line
+    val locationArgs = args.takeRight(args.length - 4)
+    val boundingBoxes = if (locationArgs.length == 0) {
+      System.out.println("No location bounding boxes specified, using defaults for New York City")
+      val nycSouthWest = Array(-74.0, 40.0)
+      val nycNorthEast = Array(-73.0, 41.0)
+      Array(nycSouthWest, nycNorthEast)
+    } else {
+      locationArgs.map(_.toDouble).sliding(2, 2).toArray
+    }
+
+    val sparkConf = new SparkConf().setAppName("TwitterLocations")
+
+    // check Spark configuration for master URL, set it to local if not configured
+    if (!sparkConf.contains("spark.master")) {
+      sparkConf.setMaster("local[2]")
+    }
+
+    val ssc = new StreamingContext(sparkConf, Seconds(2))
+    val locationsQuery = new FilterQuery().locations(boundingBoxes : _*)
+
+    // Print Tweets from the specified coordinates
+    // This includes Tweets geo-tagged in the bounding box defined by the coordinates
+    // As well as Tweets tagged in places inside of the bounding box
+    TwitterUtils.createFilteredStream(ssc, None, Some(locationsQuery))
+      .map(tweet => {
+        val latitude = Option(tweet.getGeoLocation).map(l => s"${l.getLatitude},${l.getLongitude}")
+        val place = Option(tweet.getPlace).map(_.getName)
+        val location = latitude.getOrElse(place.getOrElse("(no location)"))
+        val text = tweet.getText.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')
+        s"$location\t$text"
+      })
+      .print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index bd23a12..81ce60d 100644
--- a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -41,7 +41,7 @@ private[streaming]
 class TwitterInputDStream(
     _ssc: StreamingContext,
     twitterAuth: Option[Authorization],
-    filters: Seq[String],
+    query: Option[FilterQuery],
     storageLevel: StorageLevel
   ) extends ReceiverInputDStream[Status](_ssc)  {
 
@@ -52,14 +52,14 @@ class TwitterInputDStream(
   private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
 
   override def getReceiver(): Receiver[Status] = {
-    new TwitterReceiver(authorization, filters, storageLevel)
+    new TwitterReceiver(authorization, query, storageLevel)
   }
 }
 
 private[streaming]
 class TwitterReceiver(
     twitterAuth: Authorization,
-    filters: Seq[String],
+    query: Option[FilterQuery],
     storageLevel: StorageLevel
   ) extends Receiver[Status](storageLevel) with Logging {
 
@@ -85,10 +85,8 @@ class TwitterReceiver(
         }
       })
 
-      val query = new FilterQuery
-      if (filters.size > 0) {
-        query.track(filters.mkString(","))
-        newTwitterStream.filter(query)
+      if (query.isDefined) {
+        newTwitterStream.filter(query.get)
       } else {
         newTwitterStream.sample()
       }

http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 9cb0106..b0e9b78 100644
--- a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.twitter
 
-import twitter4j.Status
+import twitter4j.{FilterQuery, Status}
 import twitter4j.auth.Authorization
 
 import org.apache.spark.storage.StorageLevel
@@ -33,6 +33,25 @@ object TwitterUtils {
    *        authorization; this uses the system properties twitter4j.oauth.consumerKey,
    *        twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
    *        twitter4j.oauth.accessTokenSecret
+   * @param query A query to get only those tweets that match it
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def createFilteredStream(
+      ssc: StreamingContext,
+      twitterAuth: Option[Authorization],
+      query: Option[FilterQuery] = None,
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): ReceiverInputDStream[Status] = {
+    new TwitterInputDStream(ssc, twitterAuth, query, storageLevel)
+  }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter.
+   * @param ssc         StreamingContext object
+   * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+   *        authorization; this uses the system properties twitter4j.oauth.consumerKey,
+   *        twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+   *        twitter4j.oauth.accessTokenSecret
    * @param filters Set of filter strings to get only those tweets that match them
    * @param storageLevel Storage level to use for storing the received objects
    */
@@ -42,7 +61,11 @@ object TwitterUtils {
       filters: Seq[String] = Nil,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
     ): ReceiverInputDStream[Status] = {
-    new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+    val query = if (filters.nonEmpty) {
+      Some(new FilterQuery().track(filters.mkString(",")))
+    } else None
+
+    createFilteredStream(ssc, twitterAuth, query, storageLevel)
   }
 
   /**
@@ -129,4 +152,23 @@ object TwitterUtils {
     ): JavaReceiverInputDStream[Status] = {
     createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
   }
+
+  /**
+   * Create a input stream that returns tweets received from Twitter.
+   * @param jssc         JavaStreamingContext object
+   * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+   *        authorization; this uses the system properties twitter4j.oauth.consumerKey,
+   *        twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
+   *        twitter4j.oauth.accessTokenSecret
+   * @param query A query to get only those tweets that match it
+   * @param storageLevel Storage level to use for storing the received objects
+   */
+  def createFilteredStream(
+      jssc: JavaStreamingContext,
+      twitterAuth: Authorization,
+      query: FilterQuery,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[Status] = {
+    createFilteredStream(jssc.ssc, Some(twitterAuth), Some(query), storageLevel)
+  }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index 26ec8af..e22e24e 100644
--- a/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming.twitter;
 
 import org.junit.Test;
+import twitter4j.FilterQuery;
 import twitter4j.Status;
 import twitter4j.auth.Authorization;
 import twitter4j.auth.NullAuthorization;
@@ -30,6 +31,7 @@ public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
   public void testTwitterStream() {
     String[] filters = { "filter1", "filter2" };
     Authorization auth = NullAuthorization.getInstance();
+    FilterQuery query = new FilterQuery().language("en,es");
 
     // tests the API, does not actually test data receiving
     JavaDStream<Status> test1 = TwitterUtils.createStream(ssc);
@@ -40,5 +42,7 @@ public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
     JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters);
     JavaDStream<Status> test6 = TwitterUtils.createStream(ssc,
       auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+    JavaDStream<Status> test7 = TwitterUtils.createFilteredStream(ssc,
+      auth, query, StorageLevel.MEMORY_AND_DISK_SER_2());
   }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index bd23831..3f1babd 100644
--- a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.twitter
 
 import org.scalatest.BeforeAndAfter
-import twitter4j.Status
+import twitter4j.{FilterQuery, Status}
 import twitter4j.auth.{Authorization, NullAuthorization}
 
 import org.apache.spark.SparkFunSuite
@@ -38,6 +38,7 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging
   test("twitter input stream") {
     val ssc = new StreamingContext(master, framework, batchDuration)
     val filters = Seq("filter1", "filter2")
+    val query = new FilterQuery().language("fr,es")
     val authorization: Authorization = NullAuthorization.getInstance()
 
     // tests the API, does not actually test data receiving
@@ -52,6 +53,8 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging
       TwitterUtils.createStream(ssc, Some(authorization), filters)
     val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
       ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test7: ReceiverInputDStream[Status] = TwitterUtils.createFilteredStream(
+      ssc, Some(authorization), Some(query), StorageLevel.MEMORY_AND_DISK_SER_2)
 
     // Note that actually testing the data receiving is hard as authentication keys are
     // necessary for accessing Twitter live stream