You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2016/10/27 01:33:58 UTC
[3/3] incubator-streams-examples git commit: Trivial Fix,
to trigger github mirroring resync
Trivial Fix, to trigger github mirroring resync
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/8d1c9fad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/8d1c9fad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/8d1c9fad
Branch: refs/heads/master
Commit: 8d1c9fad609125ce8cd34ab164e2de073562ff5f
Parents: 64f8d6e
Author: smarthi <sm...@apache.org>
Authored: Wed Oct 26 21:33:26 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Wed Oct 26 21:33:26 2016 -0400
----------------------------------------------------------------------
.../streams/examples/flink/FlinkBase.scala | 32 ++++++++++----------
.../FlinkTwitterFollowingPipeline.scala | 14 +++------
.../collection/FlinkTwitterPostsPipeline.scala | 11 +++----
.../FlinkTwitterSpritzerPipeline.scala | 15 ++++-----
.../FlinkTwitterUserInformationPipeline.scala | 14 ++++-----
5 files changed, 36 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index a94dd61..49ca5b9 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -50,7 +50,7 @@ trait FlinkBase {
*/
def main(args: Array[String]): Unit = {
// if only one argument, use it as the config URL
- if( args.size > 0 ) {
+ if( args.length > 0 ) {
BASELOGGER.info("Args: {}", args)
configUrl = args(0)
setup(configUrl)
@@ -79,7 +79,7 @@ trait FlinkBase {
typesafe = StreamsConfigurator.getConfig
}
- return setup(typesafe)
+ setup(typesafe)
}
@@ -91,13 +91,13 @@ trait FlinkBase {
if( this.typesafe.getString("mode").equals("streaming")) {
val streamingConfiguration: FlinkStreamingConfiguration =
new ComponentConfigurator[FlinkStreamingConfiguration](classOf[FlinkStreamingConfiguration]).detectConfiguration(typesafe)
- return setupStreaming(streamingConfiguration)
+ setupStreaming(streamingConfiguration)
} else if( this.typesafe.getString("mode").equals("batch")) {
val batchConfiguration: FlinkBatchConfiguration =
new ComponentConfigurator[FlinkBatchConfiguration](classOf[FlinkBatchConfiguration]).detectConfiguration(typesafe)
- return setupBatch(batchConfiguration)
+ setupBatch(batchConfiguration)
} else {
- return false;
+ false
}
}
@@ -123,7 +123,7 @@ trait FlinkBase {
if( streamExecutionEnvironment == null )
streamExecutionEnvironment = streamEnvironment(streamingConfiguration)
- return false
+ false
}
@@ -138,17 +138,17 @@ trait FlinkBase {
if( executionEnvironment == null )
executionEnvironment = batchEnvironment(batchConfiguration)
- return true
+ true
}
def batchEnvironment(config: FlinkBatchConfiguration = new FlinkBatchConfiguration()) : ExecutionEnvironment = {
if (config.getTest == false && config.getLocal == false) {
val env = ExecutionEnvironment.getExecutionEnvironment
- return env
+ env
} else {
val env = ExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
- return env
+ env
}
}
@@ -156,7 +156,7 @@ trait FlinkBase {
if( config.getTest == false && config.getLocal == false) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setRestartStrategy(RestartStrategies.noRestart())
// start a checkpoint every hour
env.enableCheckpointing(config.getCheckpointIntervalMs)
@@ -169,10 +169,10 @@ trait FlinkBase {
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
- return env
+ env
}
- else return StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+ else StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
}
def buildReaderPath(configObject: HdfsReaderConfiguration) : String = {
@@ -188,7 +188,7 @@ trait FlinkBase {
} else {
throw new Exception("scheme not recognized: " + configObject.getScheme)
}
- return inPathBuilder
+ inPathBuilder
}
def buildWriterPath(configObject: HdfsWriterConfiguration) : String = {
@@ -204,15 +204,15 @@ trait FlinkBase {
} else {
throw new Exception("output scheme not recognized: " + configObject.getScheme)
}
- return outPathBuilder
+ outPathBuilder
}
def toProviderId(input : String) : String = {
if( input.startsWith("@") )
return input.substring(1)
if( input.contains(':'))
- return input.substring(input.lastIndexOf(':')+1)
- else return input
+ input.substring(input.lastIndexOf(':')+1)
+ else input
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 0e77ef8..a5a4f72 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -43,12 +43,6 @@ import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
-/**
- * Created by sblackmon on 4/20/16.
- */
-/**
- * Created by sblackmon on 3/15/16.
- */
object FlinkTwitterFollowingPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
@@ -59,7 +53,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
override def main(args: Array[String]) = {
super.main(args)
val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
- if( setup(jobConfig) == false ) System.exit(1)
+ if( !setup(jobConfig) ) System.exit(1)
val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
val thread = new Thread(pipeline)
thread.start()
@@ -100,7 +94,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
- return true
+ true
}
@@ -114,7 +108,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.setNumberOfExecutionRetries(0)
val inPath = buildReaderPath(config.getSource)
@@ -140,7 +134,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
else
jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism);
+ .setParallelism(env.getParallelism)
// if( test == true ) jsons.print();
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 6a070d0..8bb2997 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -52,9 +52,6 @@ import org.apache.flink.api.scala._
import scala.collection.JavaConversions._
-/**
- * Created by sblackmon on 7/29/15.
- */
object FlinkTwitterPostsPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
@@ -65,7 +62,7 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
override def main(args: Array[String]) = {
super.main(args)
val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
- if( setup(jobConfig) == false ) System.exit(1)
+ if( !setup(jobConfig) ) System.exit(1)
val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
val thread = new Thread(pipeline)
thread.start()
@@ -106,7 +103,7 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
- return true
+ true
}
@@ -120,7 +117,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.setNumberOfExecutionRetries(0)
val inPath = buildReaderPath(config.getSource)
@@ -148,7 +145,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
else
jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism);
+ .setParallelism(env.getParallelism)
// if( test == true ) jsons.print();
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 7e7cc5c..56d892b 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -45,9 +45,6 @@ import org.apache.streams.twitter.converter.TwitterDateTimeFormat
import scala.collection.JavaConversions._
-/**
- * Created by sblackmon on 7/29/15.
- */
object FlinkTwitterSpritzerPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
@@ -58,7 +55,7 @@ object FlinkTwitterSpritzerPipeline extends FlinkBase {
override def main(args: Array[String]) = {
super.main(args)
val jobConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
- if( setup(jobConfig) == false ) System.exit(1)
+ if( !setup(jobConfig) ) System.exit(1)
val pipeline: FlinkTwitterSpritzerPipeline = new FlinkTwitterSpritzerPipeline(jobConfig)
val thread = new Thread(pipeline)
thread.start()
@@ -93,7 +90,7 @@ object FlinkTwitterSpritzerPipeline extends FlinkBase {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
- return true
+ true
}
@@ -109,18 +106,18 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.setNumberOfExecutionRetries(0)
val outPath = buildWriterPath(config.getDestination)
- val streamSource : DataStream[String] = env.addSource(spritzerSource);
+ val streamSource : DataStream[String] = env.addSource(spritzerSource)
if( config.getTest == false )
streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
else
streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism);
+ .setParallelism(env.getParallelism)
// if( test == true ) jsons.print();
@@ -160,7 +157,7 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
}
override def stop(): Unit = {
- close();
+ close()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 0ff8648..01425f6 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -45,9 +45,6 @@ import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConversions._
-/**
- * Created by sblackmon on 3/15/16.
- */
object FlinkTwitterUserInformationPipeline extends FlinkBase {
val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
@@ -58,7 +55,7 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
override def main(args: Array[String]) = {
super.main(args)
val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
- if( setup(jobConfig) == false ) System.exit(1)
+ if( !setup(jobConfig) ) System.exit(1)
val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
val thread = new Thread(pipeline)
thread.start()
@@ -99,7 +96,7 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
- return true
+ true
}
@@ -113,7 +110,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.setNumberOfExecutionRetries(0)
val inPath = buildReaderPath(config.getSource)
@@ -142,7 +139,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
else
jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
- .setParallelism(env.getParallelism);
+ .setParallelism(env.getParallelism)
LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
@@ -150,7 +147,8 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
}
class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
- override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
+ override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {
+ if( input.nonEmpty )
out.collect(input.map(id => toProviderId(id)).toList)
}
}