You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/09 14:17:41 UTC

[flink] branch release-1.11 updated: [FLINK-19166][table-runtime] StreamingFileWriter should register Listener before the initialization of buckets

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 4c85ea5  [FLINK-19166][table-runtime] StreamingFileWriter should register Listener before the initialization of buckets
4c85ea5 is described below

commit 4c85ea51270873d99714b42108052474a3b0ec3f
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Sep 9 12:25:15 2020 +0800

    [FLINK-19166][table-runtime] StreamingFileWriter should register Listener before the initialization of buckets
---
 .../filesystem/stream/StreamingFileWriter.java     |  17 ++-
 .../filesystem/stream/StreamingFileWriterTest.java | 161 +++++++++++++++++++++
 2 files changed, 170 insertions(+), 8 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index c2186bf..924ef98 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -79,17 +79,10 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
 		buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
-		helper = new StreamingFileSinkHelper<>(
-				buckets,
-				context.isRestored(),
-				context.getOperatorStateStore(),
-				getRuntimeContext().getProcessingTimeService(),
-				bucketCheckInterval);
 
+		// Set listener before the initialization of Buckets.
 		inactivePartitions = new HashSet<>();
-		currentWatermark = Long.MIN_VALUE;
 		buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
-
 			@Override
 			public void bucketCreated(Bucket<RowData, String> bucket) {
 			}
@@ -99,6 +92,14 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
 				inactivePartitions.add(bucket.getBucketId());
 			}
 		});
+
+		helper = new StreamingFileSinkHelper<>(
+				buckets,
+				context.isRestored(),
+				context.getOperatorStateStore(),
+				getRuntimeContext().getProcessingTimeService(),
+				bucketCheckInterval);
+		currentWatermark = Long.MIN_VALUE;
 	}
 
 	@Override
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
new file mode 100644
index 0000000..6ea42af
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test for {@link StreamingFileWriter}.
+ */
+public class StreamingFileWriterTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	private Path path;
+
+	@Before
+	public void before() throws IOException {
+		File file = TEMPORARY_FOLDER.newFile();
+		file.delete();
+		path = new Path(file.toURI());
+	}
+
+	@Test
+	public void testFailover() throws Exception {
+		OperatorSubtaskState state;
+		try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
+			harness.setup();
+			harness.initializeEmptyState();
+			harness.open();
+			harness.processElement(row("1"), 0);
+			harness.processElement(row("2"), 0);
+			harness.processElement(row("2"), 0);
+			state = harness.snapshot(1, 1);
+			harness.processElement(row("3"), 0);
+			harness.processElement(row("4"), 0);
+			harness.notifyOfCompletedCheckpoint(1);
+			List<String> partitions = collect(harness);
+			Assert.assertEquals(Arrays.asList("1", "2"), partitions);
+		}
+
+		// first retry, no partition {1, 2} records
+		try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
+			harness.setup();
+			harness.initializeState(state);
+			harness.open();
+			harness.processElement(row("3"), 0);
+			harness.processElement(row("4"), 0);
+			state = harness.snapshot(2, 2);
+			harness.notifyOfCompletedCheckpoint(2);
+			List<String> partitions = collect(harness);
+			Assert.assertEquals(Arrays.asList("1", "2", "3", "4"), partitions);
+		}
+
+		// second retry, partition {4} repeat
+		try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
+			harness.setup();
+			harness.initializeState(state);
+			harness.open();
+			harness.processElement(row("4"), 0);
+			harness.processElement(row("5"), 0);
+			state = harness.snapshot(3, 3);
+			harness.notifyOfCompletedCheckpoint(3);
+			List<String> partitions = collect(harness);
+			Assert.assertEquals(Arrays.asList("3", "4", "5"), partitions);
+		}
+
+		// third retry, multiple snapshots
+		try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
+			harness.setup();
+			harness.initializeState(state);
+			harness.open();
+			harness.processElement(row("6"), 0);
+			harness.processElement(row("7"), 0);
+			harness.snapshot(4, 4);
+			harness.processElement(row("8"), 0);
+			harness.snapshot(5, 5);
+			harness.processElement(row("9"), 0);
+			harness.snapshot(6, 6);
+			harness.notifyOfCompletedCheckpoint(5);
+			List<String> partitions = collect(harness);
+			// should not contains partition {9}
+			Assert.assertEquals(Arrays.asList("4", "5", "6", "7", "8"), partitions);
+		}
+	}
+
+	private static RowData row(String s) {
+		return GenericRowData.of(StringData.fromString(s));
+	}
+
+	private static List<String> collect(
+			OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness) {
+		List<String> parts = new ArrayList<>();
+		harness.extractOutputValues().forEach(m -> parts.addAll(m.partitions));
+		return parts;
+	}
+
+	private OneInputStreamOperatorTestHarness<RowData, CommitMessage> create() throws Exception {
+		StreamingFileWriter writer = new StreamingFileWriter(1000, StreamingFileSink.forRowFormat(
+				path, (Encoder<RowData>) (element, stream) ->
+						stream.write((element.getString(0) + "\n").getBytes(StandardCharsets.UTF_8)))
+				.withBucketAssigner(new BucketAssigner<RowData, String>() {
+					@Override
+					public String getBucketId(RowData element, Context context) {
+						return element.getString(0).toString();
+					}
+
+					@Override
+					public SimpleVersionedSerializer<String> getSerializer() {
+						return SimpleVersionedStringSerializer.INSTANCE;
+					}
+				})
+				.withRollingPolicy(OnCheckpointRollingPolicy.build()));
+		OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = new OneInputStreamOperatorTestHarness<>(
+				writer, 1, 1, 0);
+		harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		return harness;
+	}
+}