You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2020/08/12 16:02:53 UTC
[streams] 03/04: use newer flink file sink
This is an automated email from the ASF dual-hosted git repository.
sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git
commit 132504b7f22ced3312c1cb7fc050a678a1693597
Author: sblackmon <sb...@apache.org>
AuthorDate: Sun Aug 9 06:57:02 2020 -0500
use newer flink file sink
---
.../apache/streams/examples/flink/FlinkBase.scala | 10 +++++++++
.../collection/FlinkTwitterFollowingPipeline.scala | 4 +++-
.../collection/FlinkTwitterPostsPipeline.scala | 3 ++-
.../collection/FlinkTwitterSpritzerPipeline.scala | 6 ++++--
.../FlinkTwitterUserInformationPipeline.scala | 4 +++-
.../src/test/resources/testng.xml | 2 +-
.../FlinkTwitterFollowingPipelineFollowersIT.scala | 14 ++++++++-----
.../FlinkTwitterFollowingPipelineFriendsIT.scala | 14 ++++++++-----
.../twitter/test/FlinkTwitterPostsPipelineIT.scala | 22 ++++++++++++--------
.../test/FlinkTwitterSpritzerPipelineIT.scala | 24 ++++++++++++++--------
.../FlinkTwitterUserInformationPipelineIT.scala | 22 ++++++++++++--------
11 files changed, 84 insertions(+), 41 deletions(-)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
index 3acca05..86e99ab 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala
@@ -19,12 +19,15 @@
package org.apache.streams.examples.flink
import java.net.MalformedURLException
+import java.util.concurrent.TimeUnit
import com.typesafe.config.Config
import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator}
import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration}
@@ -48,6 +51,13 @@ trait FlinkBase {
var executionEnvironment: ExecutionEnvironment = _
var streamExecutionEnvironment: StreamExecutionEnvironment = _
+ final val basePathBucketAssigner : BasePathBucketAssigner[String] = new BasePathBucketAssigner()
+ final val rollingPolicy: DefaultRollingPolicy[String, String] = DefaultRollingPolicy.builder()
+ .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+ .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+ .withMaxPartSize(1024 * 1024 * 1024)
+ .build()
+
/*
Basic stuff for every flink job
*/
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
index 6f20216..96a7739 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala
@@ -114,6 +114,7 @@ object FlinkTwitterFollowingPipeline extends FlinkBase {
class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new StreamsConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectCustomConfiguration()) extends Runnable with java.io.Serializable {
import FlinkTwitterFollowingPipeline._
+ import FlinkTwitterFollowingPipeline.rollingPolicy
override def run(): Unit = {
@@ -159,7 +160,8 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio
val fileSink : StreamingFileSink[String] = StreamingFileSink.
forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
- build()
+ withRollingPolicy(rollingPolicy).
+ withBucketAssigner(basePathBucketAssigner).build();
if( config.getTest == true ) {
keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
index ed45a9f..aad7f32 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala
@@ -162,7 +162,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new
val fileSink : StreamingFileSink[String] = StreamingFileSink.
forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
- build()
+ withRollingPolicy(rollingPolicy).
+ withBucketAssigner(basePathBucketAssigner).build();
if( config.getTest == true ) {
keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
index 6235672..0a3de4f 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala
@@ -35,8 +35,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.KeyedStream
import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.KeyedStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.streams.config.ComponentConfigurator
import org.apache.streams.config.StreamsConfigurator
@@ -136,7 +136,9 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration
val fileSink : StreamingFileSink[String] = StreamingFileSink.
forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
- build()
+ withRollingPolicy(rollingPolicy).
+ withBucketAssigner(basePathBucketAssigner).build();
+
if( config.getTest == true ) {
keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
index 21bc911..080ce57 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala
@@ -165,7 +165,9 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline
val fileSink : StreamingFileSink[String] = StreamingFileSink.
forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")).
- build()
+ withRollingPolicy(rollingPolicy).
+ withBucketAssigner(basePathBucketAssigner).build();
+
if( config.getTest == true ) {
keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE)
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
index bdb250d..6c11e30 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml
@@ -19,7 +19,7 @@
~ under the License.
-->
-<suite name="ExampleFlinkITs" preserve-order="true">
+<suite name="ExampleFlinkITs" allow-return-values="true" preserve-order="true">
<test name="FlinkTwitterUserInformationPipelineIT">
<classes>
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
index bfc6940..22ab957 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala
@@ -19,16 +19,20 @@
package org.apache.streams.examples.flink.twitter.test
import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import org.testng.annotations.Test
import scala.io.Source
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
index 6d52d44..e71dbaf 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala
@@ -19,16 +19,20 @@
package org.apache.streams.examples.flink.twitter.test
import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline
-import org.scalatest.FlatSpec
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.scalatest.Assertions._
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import org.testng.annotations.Test
import scala.io.Source
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
index a43c758..c061d43 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala
@@ -19,16 +19,20 @@
package org.apache.streams.examples.flink.twitter.test
import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import org.testng.annotations.Test
import scala.io.Source
@@ -63,9 +67,11 @@ class FlinkTwitterPostsPipelineIT {
eventually (timeout(30 seconds), interval(1 seconds)) {
assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
- assert(
- Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- >= 200)
+ val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+ assert(lines.size > 200)
+ lines foreach {
+ line => assert( line.contains("created_at") )
+ }
}
}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
index 8d45a66..00e32e6 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala
@@ -19,16 +19,20 @@
package org.apache.streams.examples.flink.twitter.test
import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import org.testng.annotations.Test
import scala.io.Source
@@ -36,7 +40,7 @@ import scala.io.Source
/**
* FlinkTwitterSpritzerPipelineIT is an integration test for FlinkTwitterSpritzerPipeline.
*/
-class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
+class FlinkTwitterSpritzerPipelineIT {
private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT])
@@ -64,9 +68,11 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec {
eventually (timeout(60 seconds), interval(1 seconds)) {
assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
- assert(
- Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- >= 10)
+ val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+ assert(lines.size > 10)
+ lines foreach {
+ line => assert( line.contains("created_at") )
+ }
}
}
diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
index 48b876a..fc582e1 100644
--- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
+++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala
@@ -19,16 +19,20 @@
package org.apache.streams.examples.flink.twitter.test
import java.io.File
-import java.nio.file.{Files, Paths}
+import java.nio.file.Files
+import java.nio.file.Paths
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions}
-import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator}
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigParseOptions
+import org.apache.streams.config.StreamsConfigurator
import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration
import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline
-import org.scalatest.FlatSpec
+import org.scalatest.Assertions._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import org.testng.annotations.Test
import scala.io.Source
@@ -63,9 +67,11 @@ class FlinkTwitterUserInformationPipelineIT {
eventually (timeout(30 seconds), interval(1 seconds)) {
assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath)))
- assert(
- Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size
- > 500)
+ val lines = Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.toList
+ assert(lines.size > 500)
+ lines foreach {
+ line => assert( line.contains("created_at") )
+ }
}
}