You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/01/10 00:12:25 UTC
bahir git commit: [BAHIR-65] Twitter integration test
Repository: bahir
Updated Branches:
refs/heads/master d9b430a0c -> a45bd8421
[BAHIR-65] Twitter integration test
Closes #80
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a45bd842
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a45bd842
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a45bd842
Branch: refs/heads/master
Commit: a45bd84210b8e68640f97dd328e7e7053c8276e6
Parents: d9b430a
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Thu Dec 20 10:39:24 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jan 9 16:11:58 2019 -0800
----------------------------------------------------------------------
streaming-twitter/README.md | 16 ++++-
.../streaming/twitter/TwitterStreamSuite.scala | 69 +++++++++++++++++---
2 files changed, 74 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/a45bd842/streaming-twitter/README.md
----------------------------------------------------------------------
diff --git a/streaming-twitter/README.md b/streaming-twitter/README.md
index 0b3d37a..4123ea9 100644
--- a/streaming-twitter/README.md
+++ b/streaming-twitter/README.md
@@ -45,4 +45,18 @@ can be provided by any of the [methods](http://twitter4j.org/en/configuration.ht
You can also either get the public stream, or get the filtered stream based on keywords.
-See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples)
\ No newline at end of file
+See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples).
+
+## Unit Test
+
+Executing integration tests requires users to register custom application at
+[Twitter Developer Portal](https://developer.twitter.com) and obtain private OAuth credentials.
+Below listing present how to run complete test suite on local workstation.
+
+ cd streaming-twitter
+ env ENABLE_TWITTER_TESTS=1 \
+ twitter4j.oauth.consumerKey=${customer key} \
+ twitter4j.oauth.consumerSecret=${customer secret} \
+ twitter4j.oauth.accessToken=${access token} \
+ twitter4j.oauth.accessTokenSecret=${access token secret} \
+ mvn clean test
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bahir/blob/a45bd842/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 3f1babd..7d7886d 100644
--- a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -17,26 +17,40 @@
package org.apache.spark.streaming.twitter
+import java.util.UUID
+
+import scala.collection.mutable
+
import org.scalatest.BeforeAndAfter
-import twitter4j.{FilterQuery, Status}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time
+import org.scalatest.time.Span
+import twitter4j.{FilterQuery, Status, TwitterFactory}
import twitter4j.auth.{Authorization, NullAuthorization}
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.ConditionalSparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
-class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
+class TwitterStreamSuite extends ConditionalSparkFunSuite
+ with Eventually with BeforeAndAfter with Logging {
+ def shouldRunTest(): Boolean = sys.env.get("ENABLE_TWITTER_TESTS").contains("1")
- val batchDuration = Seconds(1)
+ var ssc: StreamingContext = _
- private val master: String = "local[2]"
+ before {
+ ssc = new StreamingContext("local[2]", this.getClass.getSimpleName, Seconds(1))
+ }
- private val framework: String = this.getClass.getSimpleName
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ }
test("twitter input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
val filters = Seq("filter1", "filter2")
val query = new FilterQuery().language("fr,es")
val authorization: Authorization = NullAuthorization.getInstance()
@@ -55,9 +69,44 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging
ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val test7: ReceiverInputDStream[Status] = TwitterUtils.createFilteredStream(
ssc, Some(authorization), Some(query), StorageLevel.MEMORY_AND_DISK_SER_2)
+ }
+
+ testIf("messages received", shouldRunTest) {
+ val userId = TwitterFactory.getSingleton.updateStatus(
+ UUID.randomUUID().toString
+ ).getUser.getId
+
+ val receiveStream = TwitterUtils.createFilteredStream(
+ ssc, None, Some(new FilterQuery().follow(userId))
+ )
+ @volatile var receivedMessages: mutable.Set[Status] = mutable.Set()
+ receiveStream.foreachRDD { rdd =>
+ for (element <- rdd.collect()) {
+ receivedMessages += element
+ }
+ receivedMessages
+ }
+ ssc.start()
+
+ val nbOfMsg = 2
+ var publishedMessages: List[String] = List()
+
+ (1 to nbOfMsg).foreach(
+ _ => {
+ publishedMessages = UUID.randomUUID().toString :: publishedMessages
+ }
+ )
- // Note that actually testing the data receiving is hard as authentication keys are
- // necessary for accessing Twitter live stream
- ssc.stop()
+ eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) {
+ publishedMessages.foreach(
+ m => if (!receivedMessages.map(m => m.getText).contains(m.toString)) {
+ TwitterFactory.getSingleton.updateStatus(m)
+ }
+ )
+ assert(
+ publishedMessages.map(m => m.toString).toSet
+ .subsetOf(receivedMessages.map(m => m.getText))
+ )
+ }
}
}