You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/09 21:35:44 UTC
[3/6] incubator-streams-examples git commit: flink examples building
and running
flink examples building and running
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/0112a838
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/0112a838
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/0112a838
Branch: refs/heads/master
Commit: 0112a83874bb7f896a4e3964d5fde75e5967afe6
Parents: 4491cfe
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Sep 29 19:15:39 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Sep 29 19:15:39 2016 -0500
----------------------------------------------------------------------
flink/flink-twitter-collection/pom.xml | 127 +++++++++++------
.../TwitterSpritzerPipelineConfiguration.json | 29 ++++
.../FlinkTwitterFollowingPipeline.scala | 11 +-
.../collection/FlinkTwitterPostsPipeline.scala | 22 ++-
.../FlinkTwitterSpritzerPipeline.scala | 138 +++++++++++++++++++
.../FlinkTwitterUserInformationPipeline.scala | 28 ++--
.../TwitterSpritzerPipelineConfiguration.json | 29 ++++
.../FlinkTwitterFollowingPipeline.conf | 10 --
...linkTwitterFollowingPipelineFollowersIT.conf | 16 +++
.../FlinkTwitterFollowingPipelineFriendsIT.conf | 16 +++
.../resources/FlinkTwitterPostsPipeline.conf | 10 --
.../resources/FlinkTwitterPostsPipelineIT.conf | 15 ++
.../FlinkTwitterUserInformationPipeline.conf | 10 --
.../FlinkTwitterUserInformationPipelineIT.conf | 15 ++
.../test/FlinkTwitterFollowingPipelineIT.scala | 71 +++++-----
.../test/FlinkTwitterPostsPipelineIT.scala | 38 ++---
.../test/FlinkTwitterSpritzerPipelineIT.scala | 57 ++++++++
.../FlinkTwitterUserInformationPipelineIT.scala | 33 +++--
flink/pom.xml | 3 -
local/elasticsearch-hdfs/pom.xml | 14 +-
local/elasticsearch-reindex/pom.xml | 2 +-
local/mongo-elasticsearch-sync/pom.xml | 12 +-
local/twitter-follow-graph/pom.xml | 10 +-
local/twitter-history-elasticsearch/pom.xml | 14 +-
local/twitter-userstream-elasticsearch/pom.xml | 14 +-
25 files changed, 543 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/pom.xml
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml
index 33b05fe..2d35035 100644
--- a/flink/flink-twitter-collection/pom.xml
+++ b/flink/flink-twitter-collection/pom.xml
@@ -34,17 +34,38 @@
<description>Collects twitter documents using flink.</description>
<properties>
- <docker.repo>apachestreams</docker.repo>
+ <testng.version>6.9.10</testng.version>
<hdfs.version>2.7.0</hdfs.version>
<flink.version>1.1.2</flink.version>
+ <scala.version>2.10.6</scala.version>
+ <scalatest.version>2.2.5</scalatest.version>
<scala.suffix>2.10</scala.suffix>
+ <scala-maven.plugin.version>3.2.2</scala-maven.plugin.version>
</properties>
<dependencies>
<dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- <version>1.3</version>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.suffix}</artifactId>
+ <version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -65,13 +86,11 @@
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
<version>${project.version}</version>
- <type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
@@ -277,6 +296,19 @@
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>${testng.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
@@ -293,17 +325,6 @@
</testResource>
</testResources>
<plugins>
- <plugin>
- <artifactId>maven-clean-plugin</artifactId>
- <configuration>
- <filesets>
- <fileset>
- <directory>data</directory>
- <followSymlinks>false</followSymlinks>
- </fileset>
- </filesets>
- </configuration>
- </plugin>
<!-- This binary runs with logback -->
<!-- Keep log4j out -->
<plugin>
@@ -334,8 +355,56 @@
</executions>
</plugin>
<plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>${scala-maven.plugin.version}</version>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.build.finalName}</finalName>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>**/logback.xml</exclude>
+ <exclude>**/log4j.properties</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
<plugin>
<groupId>org.jsonschema2pojo</groupId>
@@ -348,7 +417,6 @@
<sourcePath>src/main/jsonschema</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
- <targetPackage>org.apache.streams.example.elasticsearch</targetPackage>
<useJodaDates>false</useJodaDates>
</configuration>
<executions>
@@ -379,25 +447,6 @@
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <includes>**/*.json</includes>
- <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
- <includeGroupIds>org.apache.streams</includeGroupIds>
- <includeTypes>test-jar</includeTypes>
- </configuration>
- <executions>
- <execution>
- <id>test-resource-dependencies</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>unpack-dependencies</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.12.4</version>
<executions>
@@ -410,10 +459,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
new file mode 100644
index 0000000..49d0d1e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 2ac7d32..2fd9336 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -3,6 +3,7 @@ package org.apache.streams.examples.flink.twitter.collection
import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.core.fs.FileSystem
@@ -17,10 +18,10 @@ import org.apache.streams.twitter.TwitterFollowingConfiguration
import org.apache.streams.twitter.pojo.Follow
import org.apache.streams.twitter.provider.TwitterFollowingProvider
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
+import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
@@ -75,6 +76,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
return false
}
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
return true
}
@@ -134,7 +141,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
def collectConnections(id : String, out : Collector[StreamsDatum]) = {
val twitProvider: TwitterFollowingProvider =
new TwitterFollowingProvider(
- twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
+ twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
)
twitProvider.prepare(twitProvider)
twitProvider.startStream()
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index f8e221c..beea973 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -3,11 +3,8 @@ package org.apache.streams.examples.flink.twitter.collection
import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
-import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration
-import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil}
-import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
-import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER
import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
@@ -33,6 +30,7 @@ import org.apache.streams.twitter.TwitterUserInformationConfiguration
import org.apache.streams.twitter.pojo.{Tweet, User}
import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
@@ -84,6 +82,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
return false
}
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
return true
}
@@ -105,16 +109,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
val outPath = buildWriterPath(config.getDestination)
- //val inProps = buildKafkaProps(config.getSourceTopic)
-
val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids")
- //val idTopicIn = new KafkaSink()
-
-// val idTopicOut : DataStream[String] = env.addSource[String](
-// new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(),
-// inProps));
-
val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs )
// these datums contain 'Tweet' objects
@@ -149,7 +145,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
val twitterConfiguration = config.getTwitter
val twitProvider: TwitterTimelineProvider =
new TwitterTimelineProvider(
- twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l)
+ twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l)
)
twitProvider.prepare(twitProvider)
twitProvider.startStream()
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
new file mode 100644
index 0000000..b615806
--- /dev/null
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -0,0 +1,138 @@
+package org.apache.streams.examples.flink.twitter.collection
+
+import java.util.concurrent.TimeUnit
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
+import com.google.common.util.concurrent.Uninterruptibles
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.connectors.fs.RollingSink
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
+import org.apache.streams.flink.FlinkStreamingConfiguration
+import org.apache.streams.jackson.StreamsJacksonMapper
+import org.apache.streams.twitter.TwitterStreamConfiguration
+import org.apache.streams.twitter.provider.TwitterStreamProvider
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.flink.api.scala._
+
+import scala.collection.JavaConversions._
+
+/**
+ * Created by sblackmon on 7/29/15.
+ */
+object FlinkTwitterSpritzerPipeline extends FlinkBase {
+
+ val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+
+ override def main(args: Array[String]) = {
+ super.main(args)
+ val jobConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
+ if( setup(jobConfig) == false ) System.exit(1)
+ val pipeline: FlinkTwitterSpritzerPipeline = new FlinkTwitterSpritzerPipeline(jobConfig)
+ val thread = new Thread(pipeline)
+ thread.start()
+ thread.join()
+ }
+
+ def setup(jobConfig: TwitterSpritzerPipelineConfiguration): Boolean = {
+
+ LOGGER.info("TwitterSpritzerPipelineConfiguration: " + jobConfig)
+
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
+
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
+
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
+
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
+ return true
+
+ }
+
+}
+
+class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
+
+ import FlinkTwitterSpritzerPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setNumberOfExecutionRetries(0)
+
+ val outPath = buildWriterPath(config.getDestination)
+
+ val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter));
+
+ if( config.getTest == false )
+ streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
+ else
+ streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism);
+
+ // if( test == true ) jsons.print();
+
+ env.execute("FlinkTwitterPostsPipeline")
+ }
+
+ class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable {
+
+ var twitProvider: TwitterStreamProvider = _
+
+ @throws[Exception]
+ override def open(parameters: Configuration): Unit = {
+ twitProvider = new TwitterStreamProvider( sourceConfig )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ }
+
+ override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ iterator = twitProvider.readCurrent().iterator()
+ iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String]))
+ } while( twitProvider.isRunning )
+ }
+
+ override def cancel(): Unit = {
+ twitProvider.cleanUp()
+ }
+
+ @throws[Exception]
+ override def close(): Unit = {
+ twitProvider.cleanUp()
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index a081c74..867255d 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -1,28 +1,18 @@
package org.apache.streams.examples.flink.twitter.collection
-import java.lang
import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.common.base.{Preconditions, Strings}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
-
-import scala.collection.JavaConversions._
-import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER
import com.google.common.util.concurrent.Uninterruptibles
-import org.apache.streams.examples.flink.FlinkBase
import org.apache.flink.api.common.functions.RichFlatMapFunction
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers._
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.connectors.fs.RollingSink
import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
@@ -30,13 +20,13 @@ import org.apache.streams.core.StreamsDatum
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.HdfsConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo.{Tweet, User}
-import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.apache.streams.twitter.pojo.User
+import org.apache.streams.twitter.provider.TwitterUserInformationProvider
import org.slf4j.{Logger, LoggerFactory}
+import scala.collection.JavaConversions._
+
/**
* Created by sblackmon on 3/15/16.
*/
@@ -85,6 +75,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
return false
}
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+
return true
}
@@ -137,7 +133,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
- out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList)
+ out.collect(input.map(id => toProviderId(id)).toList)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
new file mode 100644
index 0000000..49d0d1e
--- /dev/null
+++ b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json
@@ -0,0 +1,29 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "type": "object",
+ "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "FlinkStreamingConfiguration.json"
+ },
+ "properties": {
+ "twitter": {
+ "type": "object",
+ "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration"
+ },
+ "source": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration"
+ },
+ "destination": {
+ "type": "object",
+ "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration"
+ },
+ "providerWaitMs": {
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
deleted file mode 100644
index e74f00c..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
- endpoint = followers
- version = 1.1
- oauth {
- consumerKey = ""
- consumerSecret = ""
- accessToken = ""
- accessTokenSecret = ""
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
new file mode 100644
index 0000000..87057be
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf
@@ -0,0 +1,16 @@
+source {
+ fields = ["ID"]
+ scheme = file
+ path = "target/test-classes"
+ readerPath = "asf.txt"
+}
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterFollowingPipelineFollowersIT"
+}
+twitter.endpoint = friends
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
new file mode 100644
index 0000000..b5212ed
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf
@@ -0,0 +1,16 @@
+source {
+ fields = ["ID"]
+ scheme = file
+ path = "target/test-classes"
+ readerPath = "asf.txt"
+}
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterFollowingPipelineFriendsIT"
+}
+twitter.endpoint = friends
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
deleted file mode 100644
index 63a6481..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
- version = 1.1
- endpoint = statuses
- oauth {
- consumerKey = ""
- consumerSecret = ""
- accessToken = ""
- accessTokenSecret = ""
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
new file mode 100644
index 0000000..6954113
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf
@@ -0,0 +1,15 @@
+source {
+ fields = ["ID"]
+ scheme = file
+ path = "target/test-classes"
+ readerPath = "asf.txt"
+}
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterPostsPipelineIT"
+}
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
deleted file mode 100644
index 6e0a879..0000000
--- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf
+++ /dev/null
@@ -1,10 +0,0 @@
-twitter {
- version = 1.1
- endpoint = users
- oauth {
- consumerKey = ""
- consumerSecret = ""
- accessToken = ""
- accessTokenSecret = ""
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
new file mode 100644
index 0000000..342a850
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf
@@ -0,0 +1,15 @@
+source {
+ fields = ["ID"]
+ scheme = file
+ path = "target/test-classes"
+ readerPath = "asf.txt"
+}
+destination {
+ fields = ["DOC"]
+ scheme = file
+ path = "target/test-classes"
+ writerPath = "FlinkTwitterUserInformationPipelineIT"
+}
+providerWaitMs = 1000
+local = true
+test = true
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
index aa2b1a9..b051e90 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala
@@ -1,17 +1,22 @@
package com.peoplepattern.streams.twitter.collection
+import java.io.File
import java.nio.file.{Files, Paths}
-import com.peoplepattern.streams.pipelines.pdb.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
/**
* Created by sblackmon on 3/13/16.
@@ -20,30 +25,31 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT])
+ import FlinkTwitterFollowingPipeline._
+
@Test
def flinkTwitterFollowersPipelineFriendsIT = {
- val testConfig : TwitterFollowingPipelineConfiguration =
- new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- testConfig.getTwitter.setEndpoint("friends")
- val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
- source.setPath("target/test-classes")
- testConfig.setSource(source);
- val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
- destination.setPath("target/test-classes")
- testConfig.setDestination(destination)
- testConfig.setProviderWaitMs(1000l)
- testConfig.setTest(true)
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
val job = new FlinkTwitterFollowingPipeline(config = testConfig)
val jobThread = new Thread(job)
jobThread.start
jobThread.join
- eventually (timeout(30 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends")))
+ eventually (timeout(60 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
- Source.fromFile("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends", "UTF-8").getLines.size
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
> 90)
}
@@ -52,27 +58,26 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec {
@Test
def flinkTwitterFollowersPipelineFollowersIT = {
- val testConfig : TwitterFollowingPipelineConfiguration =
- new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- testConfig.getTwitter.setEndpoint("followers")
- val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
- source.setPath("target/test-classes")
- testConfig.setSource(source);
- val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/followers").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
- destination.setPath("target/test-classes")
- testConfig.setDestination(destination)
- testConfig.setProviderWaitMs(1000l)
- testConfig.setTest(true)
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
val job = new FlinkTwitterFollowingPipeline(config = testConfig)
val jobThread = new Thread(job)
jobThread.start
jobThread.join
- eventually (timeout(30 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterFollowingPipeline/followers")))
+ eventually (timeout(60 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
- Source.fromFile("target/test-classes/FlinkTwitterFollowingPipeline/followers", "UTF-8").getLines.size
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
> 500)
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index 8a942e5..a355696 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -1,12 +1,16 @@
package com.peoplepattern.streams.twitter.collection
+import java.io.File
import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit
import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
-import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline}
import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
import org.slf4j.{Logger, LoggerFactory}
@@ -20,23 +24,25 @@ import org.testng.annotations.Test
/**
* Created by sblackmon on 3/13/16.
*/
-class FlinkTwitterPostsPipelineIT extends FlatSpec {
+class FlinkTwitterPostsPipelineIT extends FlatSpec {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT])
+ import FlinkTwitterPostsPipeline._
+
@Test
def flinkTwitterPostsPipelineIT = {
- val testConfig : TwitterPostsPipelineConfiguration =
- new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
- source.setPath("target/test-classes")
- testConfig.setSource(source);
- val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterPostsPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
- destination.setPath("target/test-classes")
- testConfig.setDestination(destination)
- testConfig.setProviderWaitMs(1000l)
- testConfig.setTest(true)
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterPostsPipelineIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
val job = new FlinkTwitterPostsPipeline(config = testConfig)
val jobThread = new Thread(job)
@@ -44,9 +50,9 @@ class FlinkTwitterPostsPipelineIT extends FlatSpec {
jobThread.join
eventually (timeout(30 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterPostsPipeline")))
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
- Source.fromFile("target/test-classes/FlinkTwitterPostsPipeline", "UTF-8").getLines.size
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
>= 200)
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
new file mode 100644
index 0000000..f083f65
--- /dev/null
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -0,0 +1,57 @@
+package org.apache.streams.examples.flink.twitter.test
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
+import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
+import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
+import org.slf4j.{Logger, LoggerFactory}
+import org.testng.annotations.Test
+
+import scala.io.Source
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.SpanSugar._
+
+/**
+ * Created by sblackmon on 3/13/16.
+ */
+class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
+
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT])
+
+ import FlinkTwitterSpritzerPipeline._
+
+ @Test(enabled = false)
+ def flinkTwitterSpritzerPipelineIT = {
+
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterSpritzerPipelineIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
+
+ val job = new FlinkTwitterSpritzerPipeline(config = testConfig)
+ val jobThread = new Thread(job)
+ jobThread.start
+ jobThread.join
+
+ eventually (timeout(30 seconds), interval(1 seconds)) {
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
+ assert(
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
+ >= 200)
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 3d21244..2ca8650 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -1,10 +1,13 @@
package com.peoplepattern.streams.twitter.collection
+import java.io.File
import java.nio.file.{Files, Paths}
-import com.peoplepattern.streams.pipelines.pdb.{TwitterPostsPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
+import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
+import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
import org.scalatest.FlatSpec
import org.scalatest._
@@ -25,19 +28,21 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])
+ import FlinkTwitterUserInformationPipeline._
+
@Test
def flinkTwitterUserInformationPipelineIT = {
- val testConfig : TwitterUserInformationPipelineConfiguration =
- new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("1000twitterids.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration]
- source.setPath("target/test-classes")
- testConfig.setSource(source);
- val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/TwitterUserInformationPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration]
- destination.setPath("target/test-classes")
- testConfig.setDestination(destination)
- testConfig.setProviderWaitMs(1000l)
- testConfig.setTest(true)
+ val reference: Config = ConfigFactory.load()
+ val conf_file: File = new File("target/test-classes/FlinkTwitterUserInformationPipelineIT.conf")
+ assert(conf_file.exists())
+ val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ val typesafe: Config = testResourceConfig.withFallback(reference).resolve()
+ val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe)
+ val testConfig = new ComponentConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
+
+ setup(testConfig)
val job = new FlinkTwitterUserInformationPipeline(config = testConfig)
val jobThread = new Thread(job)
@@ -45,9 +50,9 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
jobThread.join
eventually (timeout(30 seconds), interval(1 seconds)) {
- assert(Files.exists(Paths.get("target/test-classes/TwitterUserInformationPipeline")))
+ assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
assert(
- Source.fromFile("target/test-classes/TwitterUserInformationPipeline", "UTF-8").getLines.size
+ Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
> 500)
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 7054e89..6c50ca2 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -41,7 +41,4 @@
<module>flink-twitter-collection</module>
</modules>
- <build>
-
- </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml
index 7b653fc..52cd0fc 100644
--- a/local/elasticsearch-hdfs/pom.xml
+++ b/local/elasticsearch-hdfs/pom.xml
@@ -67,7 +67,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -76,34 +76,34 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-hdfs</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-reindex/pom.xml
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/pom.xml b/local/elasticsearch-reindex/pom.xml
index e81cbe2..325e564 100644
--- a/local/elasticsearch-reindex/pom.xml
+++ b/local/elasticsearch-reindex/pom.xml
@@ -92,7 +92,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/mongo-elasticsearch-sync/pom.xml
----------------------------------------------------------------------
diff --git a/local/mongo-elasticsearch-sync/pom.xml b/local/mongo-elasticsearch-sync/pom.xml
index 318c47e..d268ed7 100644
--- a/local/mongo-elasticsearch-sync/pom.xml
+++ b/local/mongo-elasticsearch-sync/pom.xml
@@ -66,7 +66,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -75,27 +75,27 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-mongo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-follow-graph/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-follow-graph/pom.xml b/local/twitter-follow-graph/pom.xml
index d40adde..9bf980d 100644
--- a/local/twitter-follow-graph/pom.xml
+++ b/local/twitter-follow-graph/pom.xml
@@ -49,17 +49,17 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
@@ -70,12 +70,12 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-graph</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-history-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-history-elasticsearch/pom.xml b/local/twitter-history-elasticsearch/pom.xml
index afc8cf0..ba6dbe7 100644
--- a/local/twitter-history-elasticsearch/pom.xml
+++ b/local/twitter-history-elasticsearch/pom.xml
@@ -69,7 +69,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -78,29 +78,29 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
@@ -111,7 +111,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-userstream-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml
index 224bdd4..1b7b64f 100644
--- a/local/twitter-userstream-elasticsearch/pom.xml
+++ b/local/twitter-userstream-elasticsearch/pom.xml
@@ -67,7 +67,7 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-core</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
@@ -76,32 +76,32 @@
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-config</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-runtime-local</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-filters</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-provider-twitter</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-persist-elasticsearch</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
<artifactId>streams-pojo</artifactId>
- <version>0.3-incubating</version>
+ <version>0.4-incubating-SNAPSHOT</version>
<type>test-jar</type>
</dependency>
<dependency>