You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/09/09 14:17:41 UTC
[flink] branch release-1.11 updated: [FLINK-19166][table-runtime]
StreamingFileWriter should register Listener before the initialization of
buckets
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 4c85ea5 [FLINK-19166][table-runtime] StreamingFileWriter should register Listener before the initialization of buckets
4c85ea5 is described below
commit 4c85ea51270873d99714b42108052474a3b0ec3f
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed Sep 9 12:25:15 2020 +0800
[FLINK-19166][table-runtime] StreamingFileWriter should register Listener before the initialization of buckets
---
.../filesystem/stream/StreamingFileWriter.java | 17 ++-
.../filesystem/stream/StreamingFileWriterTest.java | 161 +++++++++++++++++++++
2 files changed, 170 insertions(+), 8 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index c2186bf..924ef98 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -79,17 +79,10 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());
- helper = new StreamingFileSinkHelper<>(
- buckets,
- context.isRestored(),
- context.getOperatorStateStore(),
- getRuntimeContext().getProcessingTimeService(),
- bucketCheckInterval);
+ // Set listener before the initialization of Buckets.
inactivePartitions = new HashSet<>();
- currentWatermark = Long.MIN_VALUE;
buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
-
@Override
public void bucketCreated(Bucket<RowData, String> bucket) {
}
@@ -99,6 +92,14 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
inactivePartitions.add(bucket.getBucketId());
}
});
+
+ helper = new StreamingFileSinkHelper<>(
+ buckets,
+ context.isRestored(),
+ context.getOperatorStateStore(),
+ getRuntimeContext().getProcessingTimeService(),
+ bucketCheckInterval);
+ currentWatermark = Long.MIN_VALUE;
}
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
new file mode 100644
index 0000000..6ea42af
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test for {@link StreamingFileWriter}.
+ */
+public class StreamingFileWriterTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ private Path path;
+
+ @Before
+ public void before() throws IOException {
+ File file = TEMPORARY_FOLDER.newFile();
+ file.delete();
+ path = new Path(file.toURI());
+ }
+
+ @Test
+ public void testFailover() throws Exception {
+ OperatorSubtaskState state;
+ try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
+ harness.setup();
+ harness.initializeEmptyState();
+ harness.open();
+ harness.processElement(row("1"), 0);
+ harness.processElement(row("2"), 0);
+ harness.processElement(row("2"), 0);
+ state = harness.snapshot(1, 1);
+ harness.processElement(row("3"), 0);
+ harness.processElement(row("4"), 0);
+ harness.notifyOfCompletedCheckpoint(1);
+ List<String> partitions = collect(harness);
+ Assert.assertEquals(Arrays.asList("1", "2"), partitions);
+ }
+
+ // first retry, no partition {1, 2} records
+ try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
+ harness.setup();
+ harness.initializeState(state);
+ harness.open();
+ harness.processElement(row("3"), 0);
+ harness.processElement(row("4"), 0);
+ state = harness.snapshot(2, 2);
+ harness.notifyOfCompletedCheckpoint(2);
+ List<String> partitions = collect(harness);
+ Assert.assertEquals(Arrays.asList("1", "2", "3", "4"), partitions);
+ }
+
+ // second retry, partition {4} repeat
+ try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
+ harness.setup();
+ harness.initializeState(state);
+ harness.open();
+ harness.processElement(row("4"), 0);
+ harness.processElement(row("5"), 0);
+ state = harness.snapshot(3, 3);
+ harness.notifyOfCompletedCheckpoint(3);
+ List<String> partitions = collect(harness);
+ Assert.assertEquals(Arrays.asList("3", "4", "5"), partitions);
+ }
+
+ // third retry, multiple snapshots
+ try (OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = create()) {
+ harness.setup();
+ harness.initializeState(state);
+ harness.open();
+ harness.processElement(row("6"), 0);
+ harness.processElement(row("7"), 0);
+ harness.snapshot(4, 4);
+ harness.processElement(row("8"), 0);
+ harness.snapshot(5, 5);
+ harness.processElement(row("9"), 0);
+ harness.snapshot(6, 6);
+ harness.notifyOfCompletedCheckpoint(5);
+ List<String> partitions = collect(harness);
+ // should not contains partition {9}
+ Assert.assertEquals(Arrays.asList("4", "5", "6", "7", "8"), partitions);
+ }
+ }
+
+ private static RowData row(String s) {
+ return GenericRowData.of(StringData.fromString(s));
+ }
+
+ private static List<String> collect(
+ OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness) {
+ List<String> parts = new ArrayList<>();
+ harness.extractOutputValues().forEach(m -> parts.addAll(m.partitions));
+ return parts;
+ }
+
+ private OneInputStreamOperatorTestHarness<RowData, CommitMessage> create() throws Exception {
+ StreamingFileWriter writer = new StreamingFileWriter(1000, StreamingFileSink.forRowFormat(
+ path, (Encoder<RowData>) (element, stream) ->
+ stream.write((element.getString(0) + "\n").getBytes(StandardCharsets.UTF_8)))
+ .withBucketAssigner(new BucketAssigner<RowData, String>() {
+ @Override
+ public String getBucketId(RowData element, Context context) {
+ return element.getString(0).toString();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<String> getSerializer() {
+ return SimpleVersionedStringSerializer.INSTANCE;
+ }
+ })
+ .withRollingPolicy(OnCheckpointRollingPolicy.build()));
+ OneInputStreamOperatorTestHarness<RowData, CommitMessage> harness = new OneInputStreamOperatorTestHarness<>(
+ writer, 1, 1, 0);
+ harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+ return harness;
+ }
+}