You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/12/17 19:59:01 UTC
[8/9] incubator-streams-examples git commit: STREAMS-474: Apply check
style requirements to examples
STREAMS-474: Apply check style requirements to examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/cb7c0b9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/cb7c0b9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/cb7c0b9d
Branch: refs/heads/master
Commit: cb7c0b9de4abcbd804e142a892d55cacb998a8f0
Parents: c4821b6
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Thu Dec 15 17:39:14 2016 -0600
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Thu Dec 15 17:39:14 2016 -0600
----------------------------------------------------------------------
.../streams/examples/flink/FlinkBase.scala | 23 +--
.../FlinkTwitterFollowingPipeline.scala | 174 ++++++++++---------
.../collection/FlinkTwitterPostsPipeline.scala | 28 ++-
.../FlinkTwitterSpritzerPipeline.scala | 10 +-
.../FlinkTwitterUserInformationPipeline.scala | 15 +-
...inkTwitterFollowingPipelineFollowersIT.scala | 2 +-
...FlinkTwitterFollowingPipelineFriendsIT.scala | 16 +-
.../test/FlinkTwitterPostsPipelineIT.scala | 16 +-
.../test/FlinkTwitterSpritzerPipelineIT.scala | 14 +-
.../FlinkTwitterUserInformationPipelineIT.scala | 16 +-
.../streams/example/ElasticsearchHdfs.java | 63 ++++---
.../streams/example/HdfsElasticsearch.java | 63 ++++---
.../example/test/ElasticsearchHdfsIT.java | 69 ++++----
.../example/test/HdfsElasticsearchIT.java | 77 ++++----
.../src/test/resources/ElasticsearchHdfsIT.conf | 1 +
.../src/test/resources/HdfsElasticsearchIT.conf | 3 +-
.../streams/example/ElasticsearchReindex.java | 58 +++----
.../test/ElasticsearchReindexChildIT.java | 79 +++++----
.../example/test/ElasticsearchReindexIT.java | 76 ++++----
.../test/ElasticsearchReindexParentIT.java | 89 +++++-----
.../resources/ElasticsearchReindexChildIT.conf | 1 +
.../test/resources/ElasticsearchReindexIT.conf | 1 +
.../resources/ElasticsearchReindexParentIT.conf | 1 +
.../streams/example/MongoElasticsearchSync.java | 61 +++----
.../example/test/MongoElasticsearchSyncIT.java | 71 ++++----
.../resources/MongoElasticsearchSyncIT.conf | 1 +
.../streams/example/TwitterFollowNeo4j.java | 73 ++++----
.../example/test/TwitterFollowNeo4jIT.java | 44 +++--
.../test/resources/TwitterFollowNeo4jIT.conf | 3 +-
.../example/TwitterHistoryElasticsearch.java | 61 +++----
.../test/TwitterHistoryElasticsearchIT.java | 76 ++++----
.../TwitterHistoryElasticsearchIT.conf | 2 +-
.../example/TwitterUserstreamElasticsearch.java | 140 +++++++--------
.../test/TwitterUserstreamElasticsearchIT.java | 78 ++++-----
.../TwitterUserstreamElasticsearchIT.conf | 3 +-
pom.xml | 78 +++++++++
36 files changed, 821 insertions(+), 765 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 49ca5b9..b6d806c 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -32,6 +32,9 @@ import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, Hdfs
import org.apache.streams.jackson.StreamsJacksonMapper
import org.slf4j.LoggerFactory
+/**
+ * FlinkBase is a base class with capabilities common to all of the streams flink examples.
+ */
trait FlinkBase {
private val BASELOGGER = LoggerFactory.getLogger("FlinkBase")
@@ -101,16 +104,16 @@ trait FlinkBase {
}
}
-// def setup(typesafe: Config): Boolean = {
-//
-// val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
-//
-// this.streamsConfig = streamsConfig
-//
-// BASELOGGER.info("Streams Config: " + streamsConfig)
-//
-// setup(streamsConfig)
-// }
+ // def setup(typesafe: Config): Boolean = {
+ //
+ // val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe)
+ //
+ // this.streamsConfig = streamsConfig
+ //
+ // BASELOGGER.info("Streams Config: " + streamsConfig)
+ //
+ // setup(streamsConfig)
+ // }
def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = {
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index a5a4f72..17246e5 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
@@ -31,26 +32,29 @@ import org.apache.flink.streaming.connectors.fs.RollingSink
import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
import org.apache.streams.core.StreamsDatum
+import org.apache.streams.examples.flink.FlinkBase
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
import org.apache.streams.jackson.StreamsJacksonMapper
import org.apache.streams.twitter.TwitterFollowingConfiguration
import org.apache.streams.twitter.pojo.Follow
import org.apache.streams.twitter.provider.TwitterFollowingProvider
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.streams.examples.flink.FlinkBase
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
-import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration}
-import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
+/**
+ * FlinkTwitterFollowingPipeline collects friends or followers of all profiles from a
+ * set of IDs, writing each connection as a twitter:follow in json format to dfs.
+ */
object FlinkTwitterFollowingPipeline extends FlinkBase {
- val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
+ val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
- private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
- private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline])
+ private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance()
- override def main(args: Array[String]) = {
+ override def main(args: Array[String]) = {
super.main(args)
val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
if( !setup(jobConfig) ) System.exit(1)
@@ -58,111 +62,111 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
val thread = new Thread(pipeline)
thread.start()
thread.join()
- }
+ }
- def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean = {
+ def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean = {
- LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
+ LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig)
- if( jobConfig == null ) {
- LOGGER.error("jobConfig is null!")
- System.err.println("jobConfig is null!")
- return false
- }
+ if( jobConfig == null ) {
+ LOGGER.error("jobConfig is null!")
+ System.err.println("jobConfig is null!")
+ return false
+ }
- if( jobConfig.getSource == null ) {
- LOGGER.error("jobConfig.getSource is null!")
- System.err.println("jobConfig.getSource is null!")
- return false
- }
+ if( jobConfig.getSource == null ) {
+ LOGGER.error("jobConfig.getSource is null!")
+ System.err.println("jobConfig.getSource is null!")
+ return false
+ }
- if( jobConfig.getDestination == null ) {
- LOGGER.error("jobConfig.getDestination is null!")
- System.err.println("jobConfig.getDestination is null!")
- return false
- }
+ if( jobConfig.getDestination == null ) {
+ LOGGER.error("jobConfig.getDestination is null!")
+ System.err.println("jobConfig.getDestination is null!")
+ return false
+ }
- if( jobConfig.getTwitter == null ) {
- LOGGER.error("jobConfig.getTwitter is null!")
- System.err.println("jobConfig.getTwitter is null!")
- return false
- }
+ if( jobConfig.getTwitter == null ) {
+ LOGGER.error("jobConfig.getTwitter is null!")
+ System.err.println("jobConfig.getTwitter is null!")
+ return false
+ }
- Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
- Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
+ Preconditions.checkNotNull(jobConfig.getTwitter.getOauth)
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
- true
+ true
- }
+ }
}
class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable {
- import FlinkTwitterFollowingPipeline._
+ import FlinkTwitterFollowingPipeline._
+
+ override def run(): Unit = {
+
+ val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
- override def run(): Unit = {
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+ env.setNumberOfExecutionRetries(0)
- val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
+ val inPath = buildReaderPath(config.getSource)
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
- env.setNumberOfExecutionRetries(0)
+ val outPath = buildWriterPath(config.getDestination)
- val inPath = buildReaderPath(config.getSource)
+ val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
- val outPath = buildWriterPath(config.getDestination)
+ // these datums contain 'Follow' objects
+ val followDatums: DataStream[StreamsDatum] =
+ keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
- val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs )
+ val follows: DataStream[Follow] = followDatums
+ .map(datum => datum.getDocument.asInstanceOf[Follow])
- // these datums contain 'Follow' objects
- val followDatums: DataStream[StreamsDatum] =
- keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10)
+ val jsons: DataStream[String] = follows
+ .map(follow => {
+ val MAPPER = StreamsJacksonMapper.getInstance
+ MAPPER.writeValueAsString(follow)
+ })
- val follows: DataStream[Follow] = followDatums
- .map(datum => datum.getDocument.asInstanceOf[Follow])
+ if( config.getTest == false )
+ jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
+ else
+ jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
+ .setParallelism(env.getParallelism)
- val jsons: DataStream[String] = follows
- .map(follow => {
- val MAPPER = StreamsJacksonMapper.getInstance
- MAPPER.writeValueAsString(follow)
- })
+ // if( test == true ) jsons.print();
- if( config.getTest == false )
- jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
- else
- jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism)
+ env.execute(STREAMS_ID)
+ }
- // if( test == true ) jsons.print();
+ class FollowingCollectorFlatMapFunction(
+ twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
+ flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
+ ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
- env.execute(STREAMS_ID)
+ override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
+ collectConnections(input, out)
}
- class FollowingCollectorFlatMapFunction(
- twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")),
- flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)
- ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
-
- override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
- collectConnections(input, out)
- }
-
- def collectConnections(id : String, out : Collector[StreamsDatum]) = {
- val twitProvider: TwitterFollowingProvider =
- new TwitterFollowingProvider(
- twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration]
- )
- twitProvider.prepare(twitProvider)
- twitProvider.startStream()
- var iterator: Iterator[StreamsDatum] = null
- do {
- Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
- twitProvider.readCurrent().iterator().toList.map(out.collect(_))
- } while( twitProvider.isRunning )
- }
+ def collectConnections(id : String, out : Collector[StreamsDatum]) = {
+ val twitProvider: TwitterFollowingProvider =
+ new TwitterFollowingProvider(
+ twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration]
+ )
+ twitProvider.prepare(twitProvider)
+ twitProvider.startStream()
+ var iterator: Iterator[StreamsDatum] = null
+ do {
+ Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS)
+ twitProvider.readCurrent().iterator().toList.map(out.collect(_))
+ } while( twitProvider.isRunning )
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 8bb2997..549e048 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -23,35 +23,29 @@ import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.restartstrategy.RestartStrategies
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.AllWindowFunction
-import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.fs.RollingSink
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
import org.apache.streams.core.StreamsDatum
import org.apache.streams.examples.flink.FlinkBase
import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
import org.apache.streams.flink.FlinkStreamingConfiguration
-import org.apache.streams.hdfs.HdfsConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
-import org.apache.streams.twitter.TwitterUserInformationConfiguration
-import org.apache.streams.twitter.pojo.{Tweet, User}
-import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider}
+import org.apache.streams.twitter.pojo.Tweet
+import org.apache.streams.twitter.provider.TwitterTimelineProvider
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
+/**
+ * FlinkTwitterPostsPipeline collects recent posts from all profiles from a
+ * set of IDs, writing each post as a twitter:status in json format to dfs.
+ */
object FlinkTwitterPostsPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
@@ -154,13 +148,13 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable {
override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = {
- collectPosts(input, out)
+ collectPosts(input, out)
}
def collectPosts(id : String, out : Collector[StreamsDatum]) = {
val twitterConfiguration = config.getTwitter
val twitProvider: TwitterTimelineProvider =
new TwitterTimelineProvider(
- twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l)
+ twitterConfiguration.withInfo(List(toProviderId(id)))
)
twitProvider.prepare(twitProvider)
twitProvider.startStream()
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 56d892b..dbb8a33 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -25,11 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.{Preconditions, Strings}
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.flink.api.common.functions.StoppableFunction
+import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
-import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.fs.RollingSink
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
import org.apache.streams.core.StreamsDatum
@@ -38,13 +39,16 @@ import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfigur
import org.apache.streams.flink.FlinkStreamingConfiguration
import org.apache.streams.jackson.StreamsJacksonMapper
import org.apache.streams.twitter.TwitterStreamConfiguration
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat
import org.apache.streams.twitter.provider.TwitterStreamProvider
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.flink.api.scala._
-import org.apache.streams.twitter.converter.TwitterDateTimeFormat
import scala.collection.JavaConversions._
+/**
+ * FlinkTwitterSpritzerPipeline opens a spritzer stream and writes
+ * each post received as a twitter:status in json format to dfs.
+ */
object FlinkTwitterSpritzerPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 01425f6..c180089 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -22,15 +22,14 @@ import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.{Preconditions, Strings}
-import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction}
-import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows}
import com.google.common.util.concurrent.Uninterruptibles
import org.apache.flink.api.common.functions.RichFlatMapFunction
-import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
-import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.api.scala._
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.streaming.connectors.fs.RollingSink
import org.apache.flink.util.Collector
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
@@ -45,6 +44,10 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConversions._
+/**
+ * FlinkTwitterPostsPipeline collects the current user profile of a
+ * set of IDs, writing each as a twitter:user in json format to dfs.
+ */
object FlinkTwitterUserInformationPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index 6cf8d9d..be22b82 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -34,7 +34,7 @@ import org.testng.annotations.Test
import scala.io.Source
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterFollowingPipelineFollowersIT is an integration test for FlinkTwitterFollowingPipeline.
*/
class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec {
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 8ea7f9c..9829ebc 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -21,24 +21,20 @@ package com.peoplepattern.streams.twitter.collection
import java.io.File
import java.nio.file.{Files, Paths}
-import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.junit.Ignore
+import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterFollowingPipelineFriendsIT is an integration test for FlinkTwitterFollowingPipeline.
*/
class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec {
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index 7113c4c..987e82d 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -20,27 +20,21 @@ package com.peoplepattern.streams.twitter.collection
import java.io.File
import java.nio.file.{Files, Paths}
-import java.util.concurrent.TimeUnit
-import com.google.common.util.concurrent.{Monitor, Uninterruptibles}
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration}
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline}
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.io.Source
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
import org.scalatest.FlatSpec
import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
+import scala.io.Source
+
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterPostsPipelineIT is an integration test for FlinkTwitterPostsPipeline.
*/
class FlinkTwitterPostsPipelineIT extends FlatSpec {
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index 29625ca..7570bac 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -23,20 +23,18 @@ import java.nio.file.{Files, Paths}
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
-import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._
-import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline}
-import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration}
+import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
+import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
+import org.scalatest.FlatSpec
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
import scala.io.Source
-import org.scalatest.FlatSpec
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
-import org.scalatest.time.SpanSugar._
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterSpritzerPipelineIT is an integration test for FlinkTwitterSpritzerPipeline.
*/
class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index b4387f9..ab88d48 100644
--- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -22,25 +22,19 @@ import java.io.File
import java.nio.file.{Files, Paths}
import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration}
-import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
-import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration}
import org.scalatest.FlatSpec
-import org.scalatest._
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.io.Source
-import org.scalatest.Ignore
import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.{Seconds, Span}
import org.scalatest.time.SpanSugar._
+import org.slf4j.{Logger, LoggerFactory}
import org.testng.annotations.Test
+import scala.io.Source
+
/**
- * Created by sblackmon on 3/13/16.
+ * FlinkTwitterUserInformationPipelineIT is an integration test for FlinkTwitterUserInformationPipeline.
*/
class FlinkTwitterUserInformationPipelineIT extends FlatSpec {
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
index be79f4a..b859d60 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/ElasticsearchHdfs.java
@@ -23,59 +23,54 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Copies documents into a new index
+ * Copies documents from an elasticsearch index to new-line delimited json on dfs.
*/
public class ElasticsearchHdfs implements Runnable {
- public final static String STREAMS_ID = "ElasticsearchHdfs";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
-
- ElasticsearchHdfsConfiguration config;
-
- public ElasticsearchHdfs() {
- this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
- }
-
- public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
- this.config = reindex;
- }
+ public final static String STREAMS_ID = "ElasticsearchHdfs";
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfs.class);
- ElasticsearchHdfs backup = new ElasticsearchHdfs();
+ ElasticsearchHdfsConfiguration config;
- new Thread(backup).start();
+ public ElasticsearchHdfs() {
+ this(new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
- }
+ public ElasticsearchHdfs(ElasticsearchHdfsConfiguration reindex) {
+ this.config = reindex;
+ }
- @Override
- public void run() {
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+ ElasticsearchHdfs backup = new ElasticsearchHdfs();
+ new Thread(backup).start();
+ }
- ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+ @Override
+ public void run() {
- WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
+ ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+ WebHdfsPersistWriter hdfsPersistWriter = new WebHdfsPersistWriter(config.getDestination());
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
- builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
- builder.start();
- }
+ builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
+ builder.addStreamsPersistWriter(WebHdfsPersistWriter.class.getCanonicalName(), hdfsPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
+ builder.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
index 375665c..caf9cbc 100644
--- a/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
+++ b/local/elasticsearch-hdfs/src/main/java/org/apache/streams/example/HdfsElasticsearch.java
@@ -23,59 +23,54 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Copies documents into a new index
+ * Copies documents from new-line delimited json on dfs to an elasticsearch index.
*/
public class HdfsElasticsearch implements Runnable {
- public final static String STREAMS_ID = "HdfsElasticsearch";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
-
- HdfsElasticsearchConfiguration config;
-
- public HdfsElasticsearch() {
- this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
-
- }
-
- public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
- this.config = reindex;
- }
+ public final static String STREAMS_ID = "HdfsElasticsearch";
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearch.class);
- HdfsElasticsearch restore = new HdfsElasticsearch();
+ HdfsElasticsearchConfiguration config;
- new Thread(restore).start();
+ public HdfsElasticsearch() {
+ this(new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ }
- }
+ public HdfsElasticsearch(HdfsElasticsearchConfiguration reindex) {
+ this.config = reindex;
+ }
- @Override
- public void run() {
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
+ HdfsElasticsearch restore = new HdfsElasticsearch();
+ new Thread(restore).start();
+ }
- WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+ @Override
+ public void run() {
- ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+ WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(config.getSource());
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader);
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
- builder.start();
- }
+ builder.newPerpetualStream(WebHdfsPersistReader.class.getCanonicalName(), webHdfsPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, WebHdfsPersistReader.class.getCanonicalName());
+ builder.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
index 437ebf6..9b70440 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/ElasticsearchHdfsIT.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -50,56 +49,54 @@ import static org.hamcrest.core.Is.is;
import static org.testng.Assert.assertNotEquals;
/**
- * Test copying documents between hdfs and elasticsearch
+ * ElasticsearchHdfsIT is an integration test for ElasticsearchHdfs.
*/
public class ElasticsearchHdfsIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
-
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchHdfsIT.class);
- protected ElasticsearchHdfsConfiguration testConfiguration;
- protected Client testClient;
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- private int count = 0;
+ protected ElasticsearchHdfsConfiguration testConfiguration;
+ protected Client testClient;
- @BeforeClass
- public void prepareTest() throws Exception {
+ private int count = 0;
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+ @BeforeClass
+ public void prepareTest() throws Exception {
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchHdfsIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchHdfsConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertThat(indicesExistsResponse.isExists(), is(true));
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertThat(indicesExistsResponse.isExists(), is(true));
- count = (int)countResponse.getHits().getTotalHits();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertNotEquals(count, 0);
- }
+ count = (int)countResponse.getHits().getTotalHits();
- @Test
- public void ElasticsearchHdfsIT() throws Exception {
+ assertNotEquals(count, 0);
+ }
- ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+ @Test
+ public void ElasticsearchHdfsIT() throws Exception {
- backup.run();
+ ElasticsearchHdfs backup = new ElasticsearchHdfs(testConfiguration);
+ backup.run();
- // assert lines in file
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
index a629025..d5f6a29 100644
--- a/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
+++ b/local/elasticsearch-hdfs/src/test/java/org/apache/streams/example/test/HdfsElasticsearchIT.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -52,59 +51,59 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.AssertJUnit.assertTrue;
/**
- * Test copying documents between hdfs and elasticsearch
+ * HdfsElasticsearchIT is an integration test for HdfsElasticsearch.
*/
public class HdfsElasticsearchIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(HdfsElasticsearchIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected HdfsElasticsearchConfiguration testConfiguration;
- protected Client testClient;
+ protected HdfsElasticsearchConfiguration testConfiguration;
+ protected Client testClient;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/HdfsElasticsearchIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(HdfsElasticsearchConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getDestination()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- if(indicesExistsResponse.isExists()) {
- DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
- DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
- assertTrue(deleteIndexResponse.isAcknowledged());
- };
- }
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ if(indicesExistsResponse.isExists()) {
+ DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getDestination().getIndex());
+ DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+ assertTrue(deleteIndexResponse.isAcknowledged());
+ };
+ }
- @Test
- public void ElasticsearchHdfsIT() throws Exception {
+ @Test
+ public void ElasticsearchHdfsIT() throws Exception {
- HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
+ HdfsElasticsearch restore = new HdfsElasticsearch(testConfiguration);
- restore.run();
+ restore.run();
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getDestination().getIndex());
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertEquals(countResponse.getHits().getTotalHits(), 89);
+ assertEquals(countResponse.getHits().getTotalHits(), 89);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
index 80ef53a..42cb01c 100644
--- a/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
+++ b/local/elasticsearch-hdfs/src/test/resources/ElasticsearchHdfsIT.conf
@@ -27,3 +27,4 @@ destination {
path = "target/test-classes"
writerPath = "elasticsearch_hdfs_it"
}
+taskTimeoutMs = 60000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
index 072a024..30be89b 100644
--- a/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
+++ b/local/elasticsearch-hdfs/src/test/resources/HdfsElasticsearchIT.conf
@@ -28,4 +28,5 @@ destination {
type = "activity"
refresh = true
forceUseConfig = true
-}
\ No newline at end of file
+}
+taskTimeoutMs = 60000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
index 676a272..476e369 100644
--- a/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
+++ b/local/elasticsearch-reindex/src/main/java/org/apache/streams/example/ElasticsearchReindex.java
@@ -23,59 +23,59 @@ import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamBuilder;
import org.apache.streams.elasticsearch.ElasticsearchPersistReader;
import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.local.LocalRuntimeConfiguration;
import org.apache.streams.local.builders.LocalStreamBuilder;
import com.google.common.collect.Maps;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
- * Copies documents into a new index
+ * Copies documents from the source index to the destination index.
*/
public class ElasticsearchReindex implements Runnable {
- public final static String STREAMS_ID = "ElasticsearchReindex";
+ public final static String STREAMS_ID = "ElasticsearchReindex";
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindex.class);
- ElasticsearchReindexConfiguration config;
+ ElasticsearchReindexConfiguration config;
- public ElasticsearchReindex() {
- this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
+ public ElasticsearchReindex() {
+ this(new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig()));
- }
+ }
- public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
- this.config = reindex;
- }
+ public ElasticsearchReindex(ElasticsearchReindexConfiguration reindex) {
+ this.config = reindex;
+ }
- public static void main(String[] args)
- {
- LOGGER.info(StreamsConfigurator.config.toString());
+ public static void main(String[] args)
+ {
+ LOGGER.info(StreamsConfigurator.config.toString());
- ElasticsearchReindex reindex = new ElasticsearchReindex();
+ ElasticsearchReindex reindex = new ElasticsearchReindex();
- new Thread(reindex).start();
+ new Thread(reindex).start();
- }
+ }
- @Override
- public void run() {
+ @Override
+ public void run() {
- ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
+ ElasticsearchPersistReader elasticsearchPersistReader = new ElasticsearchPersistReader(config.getSource());
- ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
+ ElasticsearchPersistWriter elasticsearchPersistWriter = new ElasticsearchPersistWriter(config.getDestination());
- Map<String, Object> streamConfig = Maps.newHashMap();
- streamConfig.put(LocalStreamBuilder.STREAM_IDENTIFIER_KEY, STREAMS_ID);
- streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 7 * 24 * 60 * 1000);
- StreamBuilder builder = new LocalStreamBuilder(1000, streamConfig);
+ LocalRuntimeConfiguration localRuntimeConfiguration =
+ StreamsJacksonMapper.getInstance().convertValue(StreamsConfigurator.detectConfiguration(), LocalRuntimeConfiguration.class);
+ StreamBuilder builder = new LocalStreamBuilder(localRuntimeConfiguration);
- builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
- builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
- builder.start();
- }
+ builder.newPerpetualStream(ElasticsearchPersistReader.class.getCanonicalName(), elasticsearchPersistReader);
+ builder.addStreamsPersistWriter(ElasticsearchPersistWriter.class.getCanonicalName(), elasticsearchPersistWriter, 1, ElasticsearchPersistReader.class.getCanonicalName());
+ builder.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
index 3260949..04d7cd6 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexChildIT.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -50,64 +49,64 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertNotEquals;
/**
- * Test copying parent/child associated documents between two indexes on same cluster
+ * Test copying parent/child associated documents between two indexes on same cluster.
*/
public class ElasticsearchReindexChildIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexChildIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertNotEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.RED);
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertThat(indicesExistsResponse.isExists(), is(true));
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertThat(indicesExistsResponse.isExists(), is(true));
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- assertNotEquals(count, 0);
+ assertNotEquals(count, 0);
- }
+ }
- @Test
- public void testReindex() throws Exception {
+ @Test
+ public void testReindex() throws Exception {
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
- reindex.run();
+ reindex.run();
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+ assertThat((int)countResponse.getHits().getTotalHits(), is(count));
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
index a324e24..6c69388 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexIT.java
@@ -53,59 +53,59 @@ import static org.hamcrest.core.IsNot.not;
*/
public class ElasticsearchReindexIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertThat(indicesExistsResponse.isExists(), is(true));
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertThat(indicesExistsResponse.isExists(), is(true));
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- assertThat(count, not(0));
+ assertThat(count, not(0));
- }
+ }
- @Test
- public void testReindex() throws Exception {
+ @Test
+ public void testReindex() throws Exception {
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
- reindex.run();
+ reindex.run();
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+ assertThat((int)countResponse.getHits().getTotalHits(), is(count));
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
index 988852a..e53c057 100644
--- a/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
+++ b/local/elasticsearch-reindex/src/test/java/org/apache/streams/example/test/ElasticsearchReindexParentIT.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -59,68 +58,68 @@ import static org.testng.Assert.assertTrue;
*/
public class ElasticsearchReindexParentIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchReindexIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- protected ElasticsearchReindexConfiguration testConfiguration;
- protected Client testClient;
+ protected ElasticsearchReindexConfiguration testConfiguration;
+ protected Client testClient;
- private int count = 0;
+ private int count = 0;
- @BeforeClass
- public void prepareTest() throws Exception {
+ @BeforeClass
+ public void prepareTest() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
- testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/ElasticsearchReindexParentIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(ElasticsearchReindexConfiguration.class).detectConfiguration(typesafe);
+ testClient = ElasticsearchClientManager.getInstance(testConfiguration.getSource()).client();
- ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
- ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
- assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
+ ClusterHealthRequest clusterHealthRequest = Requests.clusterHealthRequest();
+ ClusterHealthResponse clusterHealthResponse = testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+ assertThat(clusterHealthResponse.getStatus(), not(ClusterHealthStatus.RED));
- IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
- IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
- assertTrue(indicesExistsResponse.isExists());
+ IndicesExistsRequest indicesExistsRequest = Requests.indicesExistsRequest(testConfiguration.getSource().getIndexes().get(0));
+ IndicesExistsResponse indicesExistsResponse = testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+ assertTrue(indicesExistsResponse.isExists());
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
- .setTypes(testConfiguration.getSource().getTypes().get(0));
- SearchResponse countResponse = countRequest.execute().actionGet();
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getSource().getIndexes().get(0))
+ .setTypes(testConfiguration.getSource().getTypes().get(0));
+ SearchResponse countResponse = countRequest.execute().actionGet();
- count = (int)countResponse.getHits().getTotalHits();
+ count = (int)countResponse.getHits().getTotalHits();
- PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
- URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
- ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
- String templateSource = MAPPER.writeValueAsString(template);
- putTemplateRequestBuilder.setSource(templateSource);
+ PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings");
+ URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
+ ObjectNode template = MAPPER.readValue(templateURL, ObjectNode.class);
+ String templateSource = MAPPER.writeValueAsString(template);
+ putTemplateRequestBuilder.setSource(templateSource);
- testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
+ testClient.admin().indices().putTemplate(putTemplateRequestBuilder.request()).actionGet();
- assertThat(count, not(0));
+ assertThat(count, not(0));
- }
+ }
- @Test
- public void testReindex() throws Exception {
+ @Test
+ public void testReindex() throws Exception {
- ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
+ ElasticsearchReindex reindex = new ElasticsearchReindex(testConfiguration);
- reindex.run();
+ reindex.run();
- // assert lines in file
- SearchRequestBuilder countRequest = testClient
- .prepareSearch(testConfiguration.getDestination().getIndex())
- .setTypes(testConfiguration.getDestination().getType());
- SearchResponse countResponse = countRequest.execute().actionGet();
+ // assert lines in file
+ SearchRequestBuilder countRequest = testClient
+ .prepareSearch(testConfiguration.getDestination().getIndex())
+ .setTypes(testConfiguration.getDestination().getType());
+ SearchResponse countResponse = countRequest.execute().actionGet();
- assertThat((int)countResponse.getHits().getTotalHits(), is(count));
+ assertThat((int)countResponse.getHits().getTotalHits(), is(count));
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
index 424d725..159b7d5 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexChildIT.conf
@@ -33,4 +33,5 @@
"type": "activity",
"forceUseConfig": true
}
+ taskTimeoutMs = 60000
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
index 0062f0f..faac23e 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexIT.conf
@@ -33,4 +33,5 @@
"type": "activity",
"forceUseConfig": true
}
+ taskTimeoutMs = 60000
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/cb7c0b9d/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
----------------------------------------------------------------------
diff --git a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
index 424d725..159b7d5 100644
--- a/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
+++ b/local/elasticsearch-reindex/src/test/resources/ElasticsearchReindexParentIT.conf
@@ -33,4 +33,5 @@
"type": "activity",
"forceUseConfig": true
}
+ taskTimeoutMs = 60000
}