You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2020/08/12 16:02:53 UTC

[streams] 03/04: use newer flink file sink

This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git

commit 132504b7f22ced3312c1cb7fc050a678a1693597
Author: sblackmon <sb...@apache.org>
AuthorDate: Sun Aug 9 06:57:02 2020 -0500

    use newer flink file sink
---
 .../apache/streams/examples/flink/FlinkBase.scala  | 10 +++++++++
 .../collection/FlinkTwitterFollowingPipeline.scala |  4 +++-
 .../collection/FlinkTwitterPostsPipeline.scala     |  3 ++-
 .../collection/FlinkTwitterSpritzerPipeline.scala  |  6 ++++--
 .../FlinkTwitterUserInformationPipeline.scala      |  4 +++-
 .../src/test/resources/testng.xml                  |  2 +-
 .../FlinkTwitterFollowingPipelineFollowersIT.scala | 14 ++++++++-----
 .../FlinkTwitterFollowingPipelineFriendsIT.scala   | 14 ++++++++-----
 .../twitter/test/FlinkTwitterPostsPipelineIT.scala | 22 ++++++++++++--------
 .../test/FlinkTwitterSpritzerPipelineIT.scala      | 24 ++++++++++++++--------
 .../FlinkTwitterUserInformationPipelineIT.scala    | 22 ++++++++++++--------
 11 files changed, 84 insertions(+), 41 deletions(-)

diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 3acca05..86e99ab 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -19,12 +19,15 @@
 package org.apache.streams.examples.flink
 
 import java.net.MalformedURLException
+import java.util.concurrent.TimeUnit
 
 import com.typesafe.config.Config
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
 import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration}
@@ -48,6 +51,13 @@ trait FlinkBase {
   var executionEnvironment: ExecutionEnvironment = _
   var streamExecutionEnvironment: StreamExecutionEnvironment = _
 
+  final val basePathBucketAssigner : BasePathBucketAssigner[String] = new BasePathBucketAssigner()
+  final val rollingPolicy: DefaultRollingPolicy[String, String] = DefaultRollingPolicy.builder()
+    .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+    .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+    .withMaxPartSize(1024 * 1024 * 1024)
+    .build()
+
   /*
    Basic stuff for every flink job
    */
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 6f20216..96a7739 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -114,6 +114,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
 class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new StreamsConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectCustomConfiguration()) extends Runnable with java.io.Serializable {
 
   import FlinkTwitterFollowingPipeline._
+  import FlinkTwitterFollowingPipeline.rollingPolicy
 
   override def run(): Unit = {
 
@@ -159,7 +160,8 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
 
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
-      build()
+      withRollingPolicy(rollingPolicy).
+      withBucketAssigner(basePathBucketAssigner).build();
 
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index ed45a9f..aad7f32 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -162,7 +162,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
 
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
-      build()
+      withRollingPolicy(rollingPolicy).
+      withBucketAssigner(basePathBucketAssigner).build();
 
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 6235672..0a3de4f 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -35,8 +35,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.KeyedStream
 import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.streams.config.ComponentConfigurator
 import org.apache.streams.config.StreamsConfigurator
@@ -136,7 +136,9 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
 
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
-      build()
+      withRollingPolicy(rollingPolicy).
+      withBucketAssigner(basePathBucketAssigner).build();
+
 
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 21bc911..080ce57 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -165,7 +165,9 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
 
     val fileSink : StreamingFileSink[String] = StreamingFileSink.
       forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
-      build()
+      withRollingPolicy(rollingPolicy).
+      withBucketAssigner(basePathBucketAssigner).build();
+
 
     if( config.getTest == true ) {
       keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
index bdb250d..6c11e30 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
@@ -19,7 +19,7 @@
   ~ under the License.
   -->
 
-<suite name="ExampleFlinkITs" preserve-order="true">
+<suite name="ExampleFlinkITs" allow-return-values="true" preserve-order="true">
 
     <test name="FlinkTwitterUserInformationPipelineIT">
         <classes>
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index bfc6940..22ab957 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 6d52d44..e71dbaf 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
-import org.scalatest.FlatSpec
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.scalatest.Assertions._
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index a43c758..c061d43 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
@@ -63,9 +67,11 @@ class FlinkTwitterPostsPipelineIT {
 
     eventually (timeout(30 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          >= 200)
+      val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+      assert(lines.size > 200)
+      lines foreach {
+        line => assert( line.contains("created_at") )
+      }
     }
 
   }
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index 8d45a66..00e32e6 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
@@ -36,7 +40,7 @@ import scala.io.Source
 /**
   * FlinkTwitterSpritzerPipelineIT is an integration test for FlinkTwitterSpritzerPipeline.
   */
-class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
+class FlinkTwitterSpritzerPipelineIT {
 
   private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT])
 
@@ -64,9 +68,11 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
 
     eventually (timeout(60 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          >= 10)
+      val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+      assert(lines.size > 10)
+      lines foreach {
+        line => assert( line.contains("created_at") )
+      }
     }
 
   }
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 48b876a..fc582e1 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -19,16 +19,20 @@
 package org.apache.streams.examples.flink.twitter.test
 
 import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
 
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
 import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
 import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 import org.testng.annotations.Test
 
 import scala.io.Source
@@ -63,9 +67,11 @@ class FlinkTwitterUserInformationPipelineIT {
 
     eventually (timeout(30 seconds), interval(1 seconds)) {
       assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
-      assert(
-        Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
-          > 500)
+      val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+      assert(lines.size > 500)
+      lines foreach {
+        line => assert( line.contains("created_at") )
+      }
     }
 
   }