You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/01/10 00:12:25 UTC

bahir git commit: [BAHIR-65] Twitter integration test

Repository: bahir
Updated Branches:
  refs/heads/master d9b430a0c -> a45bd8421


[BAHIR-65] Twitter integration test

Closes #80


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a45bd842
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a45bd842
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a45bd842

Branch: refs/heads/master
Commit: a45bd84210b8e68640f97dd328e7e7053c8276e6
Parents: d9b430a
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Thu Dec 20 10:39:24 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Wed Jan 9 16:11:58 2019 -0800

----------------------------------------------------------------------
 streaming-twitter/README.md                     | 16 ++++-
 .../streaming/twitter/TwitterStreamSuite.scala  | 69 +++++++++++++++++---
 2 files changed, 74 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/a45bd842/streaming-twitter/README.md
----------------------------------------------------------------------
diff --git a/streaming-twitter/README.md b/streaming-twitter/README.md
index 0b3d37a..4123ea9 100644
--- a/streaming-twitter/README.md
+++ b/streaming-twitter/README.md
@@ -45,4 +45,18 @@ can be provided by any of the [methods](http://twitter4j.org/en/configuration.ht
 
 
 You can also either get the public stream, or get the filtered stream based on keywords. 
-See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples)
\ No newline at end of file
+See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples).
+
+## Unit Test
+
+Executing integration tests requires users to register custom application at
+[Twitter Developer Portal](https://developer.twitter.com) and obtain private OAuth credentials.
+Below listing present how to run complete test suite on local workstation.
+
+    cd streaming-twitter
+    env ENABLE_TWITTER_TESTS=1 \
+        twitter4j.oauth.consumerKey=${customer key} \
+        twitter4j.oauth.consumerSecret=${customer secret} \
+        twitter4j.oauth.accessToken=${access token} \
+        twitter4j.oauth.accessTokenSecret=${access token secret} \
+        mvn clean test
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bahir/blob/a45bd842/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 3f1babd..7d7886d 100644
--- a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -17,26 +17,40 @@
 
 package org.apache.spark.streaming.twitter
 
+import java.util.UUID
+
+import scala.collection.mutable
+
 import org.scalatest.BeforeAndAfter
-import twitter4j.{FilterQuery, Status}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time
+import org.scalatest.time.Span
+import twitter4j.{FilterQuery, Status, TwitterFactory}
 import twitter4j.auth.{Authorization, NullAuthorization}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.ConditionalSparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
-class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
+class TwitterStreamSuite extends ConditionalSparkFunSuite
+    with Eventually with BeforeAndAfter with Logging {
+  def shouldRunTest(): Boolean = sys.env.get("ENABLE_TWITTER_TESTS").contains("1")
 
-  val batchDuration = Seconds(1)
+  var ssc: StreamingContext = _
 
-  private val master: String = "local[2]"
+  before {
+    ssc = new StreamingContext("local[2]", this.getClass.getSimpleName, Seconds(1))
+  }
 
-  private val framework: String = this.getClass.getSimpleName
+  after {
+    if (ssc != null) {
+      ssc.stop()
+    }
+  }
 
   test("twitter input stream") {
-    val ssc = new StreamingContext(master, framework, batchDuration)
     val filters = Seq("filter1", "filter2")
     val query = new FilterQuery().language("fr,es")
     val authorization: Authorization = NullAuthorization.getInstance()
@@ -55,9 +69,44 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging
       ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
     val test7: ReceiverInputDStream[Status] = TwitterUtils.createFilteredStream(
       ssc, Some(authorization), Some(query), StorageLevel.MEMORY_AND_DISK_SER_2)
+  }
+
+  testIf("messages received", shouldRunTest) {
+    val userId = TwitterFactory.getSingleton.updateStatus(
+      UUID.randomUUID().toString
+    ).getUser.getId
+
+    val receiveStream = TwitterUtils.createFilteredStream(
+      ssc, None, Some(new FilterQuery().follow(userId))
+    )
+    @volatile var receivedMessages: mutable.Set[Status] = mutable.Set()
+    receiveStream.foreachRDD { rdd =>
+      for (element <- rdd.collect()) {
+        receivedMessages += element
+      }
+      receivedMessages
+    }
+    ssc.start()
+
+    val nbOfMsg = 2
+    var publishedMessages: List[String] = List()
+
+    (1 to nbOfMsg).foreach(
+      _ => {
+        publishedMessages = UUID.randomUUID().toString :: publishedMessages
+      }
+    )
 
-    // Note that actually testing the data receiving is hard as authentication keys are
-    // necessary for accessing Twitter live stream
-    ssc.stop()
+    eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) {
+      publishedMessages.foreach(
+        m => if (!receivedMessages.map(m => m.getText).contains(m.toString)) {
+          TwitterFactory.getSingleton.updateStatus(m)
+        }
+      )
+      assert(
+        publishedMessages.map(m => m.toString).toSet
+          .subsetOf(receivedMessages.map(m => m.getText))
+      )
+    }
   }
 }