You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/06/08 07:40:39 UTC

[flink] branch master updated: [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d036c23c0e5 [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch.
d036c23c0e5 is described below

commit d036c23c0e5c079eaafef250a5a14b7f3eead8f1
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jun 7 11:58:44 2022 +0800

    [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch.
    
    This closes #19889.
---
 flink-connectors/flink-connector-files/pom.xml     |  8 ++++
 .../apache/flink/connector/file/sink/FileSink.java |  4 +-
 .../flink/connector/file/sink/FileSinkTest.java    | 46 ++++++++++++++++++++++
 3 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-files/pom.xml b/flink-connectors/flink-connector-files/pom.xml
index 6ea044c6b80..96c507096a9 100644
--- a/flink-connectors/flink-connector-files/pom.xml
+++ b/flink-connectors/flink-connector-files/pom.xml
@@ -89,6 +89,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-test-utils</artifactId>
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
index 8fdf987b11f..236582767e3 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
@@ -140,11 +140,11 @@ public class FileSink<IN>
 
     @Override
     public FileWriter<IN> createWriter(InitContext context) throws IOException {
-        return bucketsBuilder.createWriter(context);
+        return restoreWriter(context, Collections.emptyList());
     }
 
     @Override
-    public StatefulSinkWriter<IN, FileWriterBucketState> restoreWriter(
+    public FileWriter<IN> restoreWriter(
             InitContext context, Collection<FileWriterBucketState> recoveredState)
             throws IOException {
         FileWriter<IN> writer = bucketsBuilder.createWriter(context);
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkTest.java
new file mode 100644
index 00000000000..516a071dfb7
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
+import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPolicy;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link FileSink}. */
+public class FileSinkTest {
+
+    @Test
+    public void testCreateFileWriterWithTimerRegistered() throws IOException {
+        TestSinkInitContext ctx = new TestSinkInitContext();
+        FileSink<Integer> sink =
+                FileSink.forRowFormat(
+                                new Path("mock"), new IntegerFileSinkTestDataUtils.IntEncoder())
+                        .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy<>(1024, true))
+                        .build();
+        sink.createWriter(ctx);
+        assertEquals(ctx.getTestProcessingTimeService().getNumActiveTimers(), 1);
+    }
+}