You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/11/29 12:14:17 UTC

[flink] branch master updated: [FLINK-29092][Connectors/Test] Use one bucket assigner

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ed46cb2fd64 [FLINK-29092][Connectors/Test] Use one bucket assigner
ed46cb2fd64 is described below

commit ed46cb2fd64f1cb306ae5b7654d2b4d64ab69f22
Author: kurt <di...@163.com>
AuthorDate: Mon Sep 5 17:43:03 2022 +0800

    [FLINK-29092][Connectors/Test] Use one bucket assigner
---
 .../hadoop/bulk/HadoopPathBasedPartFileWriterTest.java    | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)

diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
index d2e427baae7..80ee85cd40c 100644
--- a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
+++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/formats/hadoop/bulk/HadoopPathBasedPartFileWriterTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
 import org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
-import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBase;
 
@@ -79,16 +79,18 @@ public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
         env.setParallelism(1);
         env.enableCheckpointing(100);
 
+        // FiniteTestSource will generate two elements with a checkpoint trigger in between the two
+        // elements
         DataStream<String> stream =
                 env.addSource(new FiniteTestSource<>(data), TypeInformation.of(String.class));
         Configuration configuration = new Configuration();
-
+        // Elements from source are going to be assigned to one bucket
         HadoopPathBasedBulkFormatBuilder<String, String, ?> builder =
                 new HadoopPathBasedBulkFormatBuilder<>(
                         basePath,
                         new TestHadoopPathBasedBulkWriterFactory(),
                         configuration,
-                        new DateTimeBucketAssigner<>());
+                        new BasePathBucketAssigner<>());
         TestStreamingFileSinkFactory<String> streamingFileSinkFactory =
                 new TestStreamingFileSinkFactory<>();
         stream.addSink(streamingFileSinkFactory.createSink(builder, 1000));
@@ -102,14 +104,9 @@ public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
     private void validateResult(List<String> expected, Configuration config, Path basePath)
             throws IOException {
         FileSystem fileSystem = FileSystem.get(basePath.toUri(), config);
-        FileStatus[] buckets = fileSystem.listStatus(basePath);
-        assertThat(buckets).isNotNull();
-        assertThat(buckets).hasSize(1);
-
-        FileStatus[] partFiles = fileSystem.listStatus(buckets[0].getPath());
+        FileStatus[] partFiles = fileSystem.listStatus(basePath);
         assertThat(partFiles).isNotNull();
         assertThat(partFiles).hasSize(2);
-
         for (FileStatus partFile : partFiles) {
             assertThat(partFile.getLen()).isGreaterThan(0);