You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/11/29 12:14:17 UTC
[flink] branch master updated: [FLINK-29092][Connectors/Test] Use one bucket assigner
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ed46cb2fd64 [FLINK-29092][Connectors/Test] Use one bucket assigner
ed46cb2fd64 is described below
commit ed46cb2fd64f1cb306ae5b7654d2b4d64ab69f22
Author: kurt <di...@163.com>
AuthorDate: Mon Sep 5 17:43:03 2022 +0800
[FLINK-29092][Connectors/Test] Use one bucket assigner
---
.../hadoop/bulk/HadoopPathBasedPartFileWriterTest.java | 15 ++++++---------
1 file changed, 6 insertions(+), 9 deletions(-)
diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
index d2e427baae7..80ee85cd40c 100644
--- a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
-import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
@@ -79,16 +79,18 @@ public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
env.setParallelism(1);
env.enableCheckpointing(100);
+ // FiniteTestSource will generate two elements with a checkpoint trigger in between the two
+ // elements
DataStream<String> stream =
env.addSource(new FiniteTestSource<>(data), TypeInformation.of(String.class));
Configuration configuration = new Configuration();
-
+ // Elements from source are going to be assigned to one bucket
HadoopPathBasedBulkFormatBuilder<String, String, ?> builder =
new HadoopPathBasedBulkFormatBuilder<>(
basePath,
new TestHadoopPathBasedBulkWriterFactory(),
configuration,
- new DateTimeBucketAssigner<>());
+ new BasePathBucketAssigner<>());
TestStreamingFileSinkFactory<String> streamingFileSinkFactory =
new TestStreamingFileSinkFactory<>();
stream.addSink(streamingFileSinkFactory.createSink(builder, 1000));
@@ -102,14 +104,9 @@ public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
private void validateResult(List<String> expected, Configuration config, Path basePath)
throws IOException {
FileSystem fileSystem = FileSystem.get(basePath.toUri(), config);
- FileStatus[] buckets = fileSystem.listStatus(basePath);
- assertThat(buckets).isNotNull();
- assertThat(buckets).hasSize(1);
-
- FileStatus[] partFiles = fileSystem.listStatus(buckets[0].getPath());
+ FileStatus[] partFiles = fileSystem.listStatus(basePath);
assertThat(partFiles).isNotNull();
assertThat(partFiles).hasSize(2);
-
for (FileStatus partFile : partFiles) {
assertThat(partFile.getLen()).isGreaterThan(0);