You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sm...@apache.org on 2016/10/27 01:33:58 UTC

[3/3] incubator-streams-examples git commit: Trivial Fix, to trigger github mirroring resync

Trivial Fix, to trigger github mirroring resync


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/8d1c9fad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/8d1c9fad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/8d1c9fad

Branch: refs/heads/master
Commit: 8d1c9fad609125ce8cd34ab164e2de073562ff5f
Parents: 64f8d6e
Author: smarthi <sm...@apache.org>
Authored: Wed Oct 26 21:33:26 2016 -0400
Committer: smarthi <sm...@apache.org>
Committed: Wed Oct 26 21:33:26 2016 -0400

----------------------------------------------------------------------
 .../streams/examples/flink/FlinkBase.scala      | 32 ++++++++++----------
 .../FlinkTwitterFollowingPipeline.scala         | 14 +++------
 .../collection/FlinkTwitterPostsPipeline.scala  | 11 +++----
 .../FlinkTwitterSpritzerPipeline.scala          | 15 ++++-----
 .../FlinkTwitterUserInformationPipeline.scala   | 14 ++++-----
 5 files changed, 36 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index a94dd61..49ca5b9 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -50,7 +50,7 @@ trait FlinkBase {
    */
   def main(args: Array[String]): Unit = {
     // if only one argument, use it as the config URL
-    if( args.size > 0 ) {
+    if( args.length > 0 ) {
       BASELOGGER.info("Args: {}", args)
       configUrl = args(0)
       setup(configUrl)
@@ -79,7 +79,7 @@ trait FlinkBase {
       typesafe = StreamsConfigurator.getConfig
     }
 
-    return setup(typesafe)
+    setup(typesafe)
 
   }
 
@@ -91,13 +91,13 @@ trait FlinkBase {
     if( this.typesafe.getString("mode").equals("streaming")) {
       val streamingConfiguration: FlinkStreamingConfiguration =
         new ComponentConfigurator[FlinkStreamingConfiguration](classOf[FlinkStreamingConfiguration]).detectConfiguration(typesafe)
-      return setupStreaming(streamingConfiguration)
+      setupStreaming(streamingConfiguration)
     } else if( this.typesafe.getString("mode").equals("batch")) {
       val batchConfiguration: FlinkBatchConfiguration =
         new ComponentConfigurator[FlinkBatchConfiguration](classOf[FlinkBatchConfiguration]).detectConfiguration(typesafe)
-      return setupBatch(batchConfiguration)
+      setupBatch(batchConfiguration)
     } else {
-      return false;
+      false
     }
   }
 
@@ -123,7 +123,7 @@ trait FlinkBase {
     if( streamExecutionEnvironment == null )
       streamExecutionEnvironment = streamEnvironment(streamingConfiguration)
 
-    return false
+    false
 
   }
 
@@ -138,17 +138,17 @@ trait FlinkBase {
     if( executionEnvironment == null )
       executionEnvironment = batchEnvironment(batchConfiguration)
 
-    return true
+    true
 
   }
 
   def batchEnvironment(config: FlinkBatchConfiguration = new FlinkBatchConfiguration()) : ExecutionEnvironment = {
     if (config.getTest == false && config.getLocal == false) {
       val env = ExecutionEnvironment.getExecutionEnvironment
-      return env
+      env
     } else {
       val env = ExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
-      return env
+      env
     }
   }
 
@@ -156,7 +156,7 @@ trait FlinkBase {
     if( config.getTest == false && config.getLocal == false) {
       val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-      env.setRestartStrategy(RestartStrategies.noRestart());
+      env.setRestartStrategy(RestartStrategies.noRestart())
 
       // start a checkpoint every hour
       env.enableCheckpointing(config.getCheckpointIntervalMs)
@@ -169,10 +169,10 @@ trait FlinkBase {
       // allow only one checkpoint to be in progress at the same time
       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 
-      return env
+      env
     }
 
-    else return StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
+    else StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt)
   }
 
   def buildReaderPath(configObject: HdfsReaderConfiguration) : String = {
@@ -188,7 +188,7 @@ trait FlinkBase {
     } else {
       throw new Exception("scheme not recognized: " + configObject.getScheme)
     }
-    return inPathBuilder
+    inPathBuilder
   }
 
   def buildWriterPath(configObject: HdfsWriterConfiguration) : String = {
@@ -204,15 +204,15 @@ trait FlinkBase {
     } else {
       throw new Exception("output scheme not recognized: " + configObject.getScheme)
     }
-    return outPathBuilder
+    outPathBuilder
   }
 
   def toProviderId(input : String) : String = {
     if( input.startsWith("@") )
       return input.substring(1)
     if( input.contains(':'))
-      return input.substring(input.lastIndexOf(':')+1)
-    else return input
+      input.substring(input.lastIndexOf(':')+1)
+    else input
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 0e77ef8..a5a4f72 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -43,12 +43,6 @@ import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
-/**
- * Created by sblackmon on 4/20/16.
- */
-/**
- * Created by sblackmon on 3/15/16.
- */
 object FlinkTwitterFollowingPipeline extends FlinkBase {
 
     val STREAMS_ID: String = "FlinkTwitterFollowingPipeline"
@@ -59,7 +53,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
     override def main(args: Array[String]) = {
     super.main(args)
     val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe)
-    if( setup(jobConfig) == false ) System.exit(1)
+    if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig)
     val thread = new Thread(pipeline)
     thread.start()
@@ -100,7 +94,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
         Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
         Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
-        return true
+        true
 
     }
 
@@ -114,7 +108,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
         val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
 
-        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
         env.setNumberOfExecutionRetries(0)
 
         val inPath = buildReaderPath(config.getSource)
@@ -140,7 +134,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
             jsons.addSink(new RollingSink[String](outPath)).setParallelism(3)
         else
             jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-              .setParallelism(env.getParallelism);
+              .setParallelism(env.getParallelism)
 
         // if( test == true ) jsons.print();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index 6a070d0..8bb2997 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -52,9 +52,6 @@ import org.apache.flink.api.scala._
 
 import scala.collection.JavaConversions._
 
-/**
-  * Created by sblackmon on 7/29/15.
-  */
 object FlinkTwitterPostsPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterPostsPipeline"
@@ -65,7 +62,7 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
   override def main(args: Array[String]) = {
     super.main(args)
     val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe)
-    if( setup(jobConfig) == false ) System.exit(1)
+    if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig)
     val thread = new Thread(pipeline)
     thread.start()
@@ -106,7 +103,7 @@ object FlinkTwitterPostsPipeline extends FlinkBase {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
     Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
-    return true
+    true
 
   }
 
@@ -120,7 +117,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
 
-    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
     val inPath = buildReaderPath(config.getSource)
@@ -148,7 +145,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
       jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
     else
       jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism);
+        .setParallelism(env.getParallelism)
 
     // if( test == true ) jsons.print();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 7e7cc5c..56d892b 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -45,9 +45,6 @@ import org.apache.streams.twitter.converter.TwitterDateTimeFormat
 
 import scala.collection.JavaConversions._
 
-/**
-  * Created by sblackmon on 7/29/15.
-  */
 object FlinkTwitterSpritzerPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline"
@@ -58,7 +55,7 @@ object FlinkTwitterSpritzerPipeline extends FlinkBase {
   override def main(args: Array[String]) = {
     super.main(args)
     val jobConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe)
-    if( setup(jobConfig) == false ) System.exit(1)
+    if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterSpritzerPipeline = new FlinkTwitterSpritzerPipeline(jobConfig)
     val thread = new Thread(pipeline)
     thread.start()
@@ -93,7 +90,7 @@ object FlinkTwitterSpritzerPipeline extends FlinkBase {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
     Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
-    return true
+    true
 
   }
 
@@ -109,18 +106,18 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
 
     val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
 
-    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
     val outPath = buildWriterPath(config.getDestination)
 
-    val streamSource : DataStream[String] = env.addSource(spritzerSource);
+    val streamSource : DataStream[String] = env.addSource(spritzerSource)
 
     if( config.getTest == false )
       streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
     else
       streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism);
+        .setParallelism(env.getParallelism)
 
     // if( test == true ) jsons.print();
 
@@ -160,7 +157,7 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
     }
 
     override def stop(): Unit = {
-      close();
+      close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/8d1c9fad/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
----------------------------------------------------------------------
diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 0ff8648..01425f6 100644
--- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -45,9 +45,6 @@ import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConversions._
 
-/**
-  * Created by sblackmon on 3/15/16.
-  */
 object FlinkTwitterUserInformationPipeline extends FlinkBase {
 
   val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline"
@@ -58,7 +55,7 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
   override def main(args: Array[String]) = {
     super.main(args)
     val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe)
-    if( setup(jobConfig) == false ) System.exit(1)
+    if( !setup(jobConfig) ) System.exit(1)
     val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig)
     val thread = new Thread(pipeline)
     thread.start()
@@ -99,7 +96,7 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase {
     Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey))
     Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret))
 
-    return true
+    true
 
   }
 
@@ -113,7 +110,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration]))
 
-    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
     env.setNumberOfExecutionRetries(0)
 
     val inPath = buildReaderPath(config.getSource)
@@ -142,7 +139,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
       jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs")
     else
       jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
-        .setParallelism(env.getParallelism);
+        .setParallelism(env.getParallelism)
 
     LOGGER.info("StreamExecutionEnvironment: {}", env.toString )
 
@@ -150,7 +147,8 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
   }
 
   class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] {
-    override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 )
+    override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {
+      if( input.nonEmpty )
         out.collect(input.map(id => toProviderId(id)).toList)
     }
   }