You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2018/05/08 02:22:52 UTC
incubator-gearpump git commit: [GEARPUMP-377] Add TwitterSource and
examples
Repository: incubator-gearpump
Updated Branches:
refs/heads/master ead442cba -> 504bcf39c
[GEARPUMP-377] Add TwitterSource and examples
Author: manuzhang <ow...@gmail.com>
Closes #247 from manuzhang/twitter_source.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/504bcf39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/504bcf39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/504bcf39
Branch: refs/heads/master
Commit: 504bcf39cd7c64127d3290f56c644f07bf5dd7b5
Parents: ead442c
Author: manuzhang <ow...@gmail.com>
Authored: Tue May 8 10:22:32 2018 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue May 8 10:22:36 2018 +0800
----------------------------------------------------------------------
.../examples/twitter/TwitterExamples.scala | 73 ++++++++++
.../streaming/twitter/TwitterSource.scala | 135 +++++++++++++++++++
.../streaming/twitter/TwitterSourceSpec.scala | 64 +++++++++
project/BuildExamples.scala | 9 +-
project/BuildExternals.scala | 12 ++
5 files changed, 292 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala b/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala
new file mode 100644
index 0000000..0b8722e
--- /dev/null
+++ b/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.twitter
+
+import java.time.Duration
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindows}
+import org.apache.gearpump.streaming.twitter.TwitterSource
+import org.apache.gearpump.util.AkkaApp
+import twitter4j.conf.ConfigurationBuilder
+
+object TwitterExamples extends AkkaApp with ArgumentsParser {
+
+ val CONSUMER_KEY = "consumer-key"
+ val CONSUMER_SECRET = "consumer-secret"
+ val TOKEN = "token"
+ val TOKEN_SECRET = "token-secret"
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ CONSUMER_KEY -> CLIOption[String]("consumer key", required = true),
+ CONSUMER_SECRET -> CLIOption[String]("consumer secret", required = true),
+ TOKEN -> CLIOption[String]("token", required = true),
+ TOKEN_SECRET -> CLIOption[String]("token secret", required = true)
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+
+ val twitterConf = new ConfigurationBuilder()
+ .setOAuthConsumerKey(config.getString(CONSUMER_KEY))
+ .setOAuthConsumerSecret(config.getString(CONSUMER_SECRET))
+ .setOAuthAccessToken(config.getString(TOKEN))
+ .setOAuthAccessTokenSecret(config.getString(TOKEN_SECRET))
+ .build()
+
+ val twitterSource = TwitterSource(twitterConf)
+
+ val context: ClientContext = ClientContext(akkaConf)
+ val app = StreamApp("TwitterExample", context)
+
+ app.source[String](twitterSource)
+ .flatMap(tweet => tweet.split("[\\s]+"))
+ .filter(_.startsWith("#"))
+ .map((_, 1))
+ .window(FixedWindows.apply(Duration.ofMinutes(1)).triggering(EventTimeTrigger))
+ .groupBy(_._1)
+ .sum
+ .sink(new LoggerSink)
+
+ context.submit(app).waitUntilFinish()
+ context.close()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala b/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala
new file mode 100644
index 0000000..9fe94ea
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.twitter
+
+import java.time.Instant
+import java.util.concurrent.LinkedBlockingQueue
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.twitter.TwitterSource.{Factory, MessageListener}
+import twitter4j._
+import twitter4j.conf.Configuration
+
+class TwitterSource private[twitter](
+ twitterFactory: Factory,
+ filterQuery: Option[FilterQuery],
+ statusListener: MessageListener
+) extends DataSource {
+
+ private var twitterStream: TwitterStream = _
+
+ /**
+ * Opens connection to data source
+ * invoked in onStart() method of [[org.apache.gearpump.streaming.source.DataSourceTask]]
+ *
+ * @param context is the task context at runtime
+ * @param startTime is the start time of system
+ */
+ override def open(context: TaskContext, startTime: Instant): Unit = {
+
+ this.twitterStream = twitterFactory.getTwitterStream
+ this.twitterStream.addListener(statusListener)
+
+ filterQuery match {
+ case Some(query) =>
+ this.twitterStream.filter(query)
+ case None =>
+ this.twitterStream.sample()
+ }
+ }
+
+ /**
+ * Reads next message from data source and
+ * returns null if no message is available
+ *
+ * @return a [[org.apache.gearpump.Message]] or null
+ */
+ override def read(): Message = {
+ Option(statusListener.poll()).map(status =>
+ Message(status.getText, Instant.now())).orNull
+ }
+
+ /**
+ * Closes connection to data source.
+ * invoked in onStop() method of [[org.apache.gearpump.streaming.source.DataSourceTask]]
+ */
+ override def close(): Unit = {
+ if (twitterStream != null) {
+ twitterStream.shutdown()
+ }
+ }
+
+ /**
+ * Returns a watermark such that no timestamp earlier than the watermark should enter the system
+ * Watermark.MAX mark the end of source data
+ */
+ override def getWatermark: Instant = {
+ Instant.now()
+ }
+}
+
+object TwitterSource {
+
+ class MessageListener extends StatusListener with Serializable {
+
+ private val queue = new LinkedBlockingQueue[Status](100000)
+
+ def poll(): Status = {
+ queue.poll()
+ }
+
+ override def onStallWarning(warning: StallWarning): Unit = {}
+
+ override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {}
+
+ override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {}
+
+ override def onStatus(status: Status): Unit = {
+ queue.offer(status)
+ }
+
+ override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {}
+
+ override def onException(ex: Exception): Unit = {
+ throw ex
+ }
+ }
+
+ /**
+ * Wrapper around TwitterStreamFactory which is final class and
+ * can not be mocked
+ */
+ class Factory(factory: TwitterStreamFactory) extends Serializable {
+
+ def getTwitterStream: TwitterStream = {
+ factory.getInstance()
+ }
+ }
+
+ def apply(conf: Configuration): TwitterSource = {
+ new TwitterSource(new Factory(new TwitterStreamFactory(conf)),
+ None, new MessageListener)
+ }
+
+ def apply(conf: Configuration, query: FilterQuery): TwitterSource = {
+ new TwitterSource(new Factory(new TwitterStreamFactory(conf)),
+ Option(query), new MessageListener)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala b/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala
new file mode 100644
index 0000000..a7ac8fb
--- /dev/null
+++ b/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.twitter
+
+import java.time.Instant
+
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.twitter.TwitterSource.{Factory, MessageListener}
+import org.mockito.Mockito._
+import org.scalacheck.{Arbitrary, Gen}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.prop.PropertyChecks
+import twitter4j.{FilterQuery, TwitterStream}
+
+class TwitterSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ implicit val arbQuery: Arbitrary[Option[FilterQuery]] = Arbitrary {
+ Gen.oneOf(None, Some(new FilterQuery()))
+ }
+
+ property("TwitterSource should properly setup, poll message and teardown") {
+ forAll {
+ (query: Option[FilterQuery], startTime: Long) =>
+ val factory = mock[Factory]
+ val stream = mock[TwitterStream]
+ val listener = mock[MessageListener]
+
+ when(factory.getTwitterStream).thenReturn(stream)
+ val twitterSource = new TwitterSource(factory, query, listener)
+
+ twitterSource.open(MockUtil.mockTaskContext, Instant.ofEpochMilli(startTime))
+
+ verify(stream).addListener(listener)
+ query match {
+ case Some(q) =>
+ verify(stream).filter(q)
+ case None =>
+ verify(stream).sample()
+ }
+
+ twitterSource.read()
+ verify(listener).poll()
+
+ twitterSource.close()
+ verify(stream).shutdown()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/project/BuildExamples.scala
----------------------------------------------------------------------
diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala
index 47aa0c6..7cc8807 100644
--- a/project/BuildExamples.scala
+++ b/project/BuildExamples.scala
@@ -37,7 +37,8 @@ object BuildExamples extends sbt.Build {
wordcount,
wordcountJava,
example_hbase,
- example_kudu
+ example_kudu,
+ example_twitter
)
/**
@@ -159,6 +160,12 @@ object BuildExamples extends sbt.Build {
).dependsOn(core % "provided", streaming % "provided; test->test",
external_hadoopfs, external_monoid, external_serializer, external_kafka)
+ lazy val example_twitter = Project(
+ id = "gearpump-examples-twitter",
+ base = file("examples/streaming/twitter"),
+ settings = exampleSettings("org.apache.gearpump.streaming.examples.twitter.TwitterExamples")
+ ).dependsOn(core % "provided", streaming % "provided; test->test", external_twitter)
+
private def exampleSettings(className: String): Seq[Def.Setting[_]] =
commonSettings ++ noPublish ++ myAssemblySettings ++ Seq(
mainClass in(Compile, packageBin) :=
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/project/BuildExternals.scala
----------------------------------------------------------------------
diff --git a/project/BuildExternals.scala b/project/BuildExternals.scala
index 698af6c..6c1289c 100644
--- a/project/BuildExternals.scala
+++ b/project/BuildExternals.scala
@@ -134,4 +134,16 @@ object BuildExternals extends sbt.Build {
))
.dependsOn(core % "provided", streaming % "test->test; provided")
.disablePlugins(sbtassembly.AssemblyPlugin)
+
+ lazy val external_twitter = Project(
+ id = "gearpump-external-twitter",
+ base = file("external/twitter"),
+ settings = commonSettings ++ javadocSettings ++
+ Seq(
+ libraryDependencies ++= Seq(
+ "org.twitter4j" % "twitter4j-stream" % "4.0.4"
+ )
+ ))
+ .dependsOn(core % "provided", streaming % "test->test; provided")
+ .disablePlugins(sbtassembly.AssemblyPlugin)
}
\ No newline at end of file