You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/16 17:26:28 UTC
[flink] 06/06: [FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink.
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit aec2d38710a67d90bd819bfdce66b5a5a646a882
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 16 18:38:48 2022 +0800
[FLINK-25583][connectors/filesystem] Add IT cases for compaction in FileSink.
This closes #18680.
---
.../file/sink/BatchCompactingFileSinkITCase.java | 69 ++++
.../file/sink/FileSinkCompactionSwitchITCase.java | 391 +++++++++++++++++++++
.../flink/connector/file/sink/FileSinkITBase.java | 4 +-
.../sink/StreamingCompactingFileSinkITCase.java | 69 ++++
4 files changed, 532 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java
new file mode 100644
index 0000000..5167e97
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchCompactingFileSinkITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Tests the compaction of the {@link FileSink} in BATCH mode. */
+@RunWith(Parameterized.class)
+public class BatchCompactingFileSinkITCase extends BatchExecutionFileSinkITCase {
+
+ private static final int PARALLELISM = 4;
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
+ @Override
+ protected FileSink<Integer> createFileSink(String path) {
+ return FileSink.forRowFormat(new Path(path), new IntegerFileSinkTestDataUtils.IntEncoder())
+ .withBucketAssigner(
+ new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
+ .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024))
+ .enableCompact(createFileCompactStrategy(), createFileCompactor())
+ .build();
+ }
+
+ private static FileCompactor createFileCompactor() {
+ return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+ }
+
+ private static FileCompactStrategy createFileCompactStrategy() {
+ return FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10000).build();
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
new file mode 100644
index 0000000..4a7c0f0
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.connector.file.sink.FileSink.DefaultRowFormatBuilder;
+import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntEncoder;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.ModuloBucketAssigner;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests of switching on or off compaction for the {@link FileSink}. */
+@RunWith(Parameterized.class)
+public class FileSinkCompactionSwitchITCase {
+
+ private static final int PARALLELISM = 4;
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
+ protected static final int NUM_SOURCES = 4;
+
+ protected static final int NUM_SINKS = 3;
+
+ protected static final int NUM_RECORDS = 10000;
+
+ protected static final int NUM_BUCKETS = 4;
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+ @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+ private static final Map<String, CountDownLatch> LATCH_MAP = new ConcurrentHashMap<>();
+
+ private String latchId;
+
+ @Parameterized.Parameter public boolean isOnToOff;
+
+ @Parameterized.Parameters(name = "isOnToOff = {0}")
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[] {false}, new Object[] {true});
+ }
+
+ @Before
+ public void setup() {
+ this.latchId = UUID.randomUUID().toString();
+ // Wait for 3 checkpoints to ensure that the coordinator and all compactors have state
+ LATCH_MAP.put(latchId, new CountDownLatch(NUM_SOURCES * 3));
+ }
+
+ @After
+ public void teardown() {
+ LATCH_MAP.remove(latchId);
+ }
+
+ @Test
+ public void testSwitchingCompaction() throws Exception {
+ String path = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap =
+ sharedObjects.add(new ConcurrentHashMap<>());
+ JobGraph jobGraph = createJobGraph(path, isOnToOff, false, sendCountMap);
+ JobGraph restoringJobGraph = createJobGraph(path, !isOnToOff, true, sendCountMap);
+
+ final Configuration config = new Configuration();
+ config.setString(RestOptions.BIND_PORT, "18081-19000");
+ final MiniClusterConfiguration cfg =
+ new MiniClusterConfiguration.Builder()
+ .setNumTaskManagers(1)
+ .setNumSlotsPerTaskManager(4)
+ .setConfiguration(config)
+ .build();
+
+ try (MiniCluster miniCluster = new MiniCluster(cfg)) {
+ miniCluster.start();
+ miniCluster.submitJob(jobGraph);
+
+ LATCH_MAP.get(latchId).await();
+
+ String savepointPath =
+ miniCluster
+ .triggerSavepoint(
+ jobGraph.getJobID(),
+ TEMPORARY_FOLDER.newFolder().getAbsolutePath(),
+ true,
+ SavepointFormatType.CANONICAL)
+ .get();
+
+ // We wait for two successful checkpoints in sources before shutting down. This ensures
+ // that the sink can commit its data.
+ LATCH_MAP.put(latchId, new CountDownLatch(NUM_SOURCES * 2));
+
+ restoringJobGraph.setSavepointRestoreSettings(
+ SavepointRestoreSettings.forPath(savepointPath, false));
+ miniCluster.executeJobBlocking(restoringJobGraph);
+ }
+
+ checkIntegerSequenceSinkOutput(path, sendCountMap.get(), NUM_BUCKETS, NUM_SOURCES);
+ }
+
+ private JobGraph createJobGraph(
+ String path,
+ boolean compactionEnabled,
+ boolean isFinite,
+ SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
+ env.configure(config, getClass().getClassLoader());
+
+ env.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ env.addSource(new CountingTestSource(latchId, NUM_RECORDS, isFinite, sendCountMap))
+ .setParallelism(NUM_SOURCES)
+ .sinkTo(createFileSink(path, compactionEnabled))
+ .uid("sink")
+ .setParallelism(NUM_SINKS);
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ return streamGraph.getJobGraph();
+ }
+
+ private FileSink<Integer> createFileSink(String path, boolean compactionEnabled) {
+ DefaultRowFormatBuilder<Integer> sinkBuilder =
+ FileSink.forRowFormat(new Path(path), new IntEncoder())
+ .withBucketAssigner(new ModuloBucketAssigner(NUM_BUCKETS))
+ .withRollingPolicy(
+ new FileSinkITBase.PartSizeAndCheckpointRollingPolicy(1024));
+
+ if (compactionEnabled) {
+ sinkBuilder =
+ sinkBuilder.enableCompact(createFileCompactStrategy(), createFileCompactor());
+ }
+
+ return sinkBuilder.build();
+ }
+
+ private static FileCompactor createFileCompactor() {
+ return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+ }
+
+ private static FileCompactStrategy createFileCompactStrategy() {
+ return FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(2).build();
+ }
+
+ private static void checkIntegerSequenceSinkOutput(
+ String path, Map<Integer, Integer> countMap, int numBuckets, int numSources)
+ throws Exception {
+ assertEquals(numSources, countMap.size());
+
+ File dir = new File(path);
+ String[] subDirNames = dir.list();
+ assertNotNull(subDirNames);
+
+ Arrays.sort(subDirNames, Comparator.comparingInt(Integer::parseInt));
+ assertEquals(numBuckets, subDirNames.length);
+ for (int i = 0; i < numBuckets; ++i) {
+ assertEquals(Integer.toString(i), subDirNames[i]);
+
+ // now check its content
+ File bucketDir = new File(path, subDirNames[i]);
+ assertTrue(
+ bucketDir.getAbsolutePath() + " Should be a existing directory",
+ bucketDir.isDirectory());
+
+ Map<Integer, Integer> counts = new HashMap<>();
+ File[] files = bucketDir.listFiles(f -> !f.getName().startsWith("."));
+ assertNotNull(files);
+
+ for (File file : files) {
+ assertTrue(file.isFile());
+
+ try (DataInputStream dataInputStream =
+ new DataInputStream(new FileInputStream(file))) {
+ while (true) {
+ int value = dataInputStream.readInt();
+ counts.compute(value, (k, v) -> v == null ? 1 : v + 1);
+ }
+ } catch (EOFException e) {
+ // End the reading
+ }
+ }
+
+ int bucketId = i;
+ int expectedCount =
+ countMap.values().stream()
+ .map(
+ numRecords ->
+ numRecords / numBuckets
+ + (bucketId < numRecords % numBuckets ? 1 : 0))
+ .mapToInt(num -> num)
+ .max()
+ .getAsInt();
+ assertEquals(expectedCount, counts.size());
+
+ List<Integer> countList = new ArrayList<>(countMap.values());
+ Collections.sort(countList);
+ for (int j = 0; j < countList.size(); j++) {
+ int rangeFrom = j == 0 ? 0 : countList.get(j - 1);
+ rangeFrom =
+ bucketId
+ + (rangeFrom % numBuckets == 0
+ ? rangeFrom
+ : (rangeFrom + numBuckets - rangeFrom % numBuckets));
+ int rangeTo = countList.get(j);
+ for (int k = rangeFrom; k < rangeTo; k += numBuckets) {
+ assertEquals(
+ "The record "
+ + k
+ + " should occur "
+ + (numBuckets - j)
+ + " times, "
+ + " but only occurs "
+ + counts.getOrDefault(k, 0)
+ + "time",
+ numBuckets - j,
+ counts.getOrDefault(k, 0).intValue());
+ }
+ }
+ }
+ }
+
+ private static class CountingTestSource extends RichParallelSourceFunction<Integer>
+ implements CheckpointListener, CheckpointedFunction {
+
+ private final String latchId;
+
+ private final int numberOfRecords;
+
+ private final boolean isFinite;
+
+ private final SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap;
+
+ private ListState<Integer> nextValueState;
+
+ private int nextValue;
+
+ private volatile boolean isCanceled;
+
+ private volatile boolean snapshottedAfterAllRecordsOutput;
+
+ private volatile boolean isWaitingCheckpointComplete;
+
+ public CountingTestSource(
+ String latchId,
+ int numberOfRecords,
+ boolean isFinite,
+ SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap) {
+ this.latchId = latchId;
+ this.numberOfRecords = numberOfRecords;
+ this.isFinite = isFinite;
+ this.sendCountMap = sendCountMap;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ nextValueState =
+ context.getOperatorStateStore()
+ .getListState(new ListStateDescriptor<>("nextValue", Integer.class));
+
+ if (nextValueState.get() != null && nextValueState.get().iterator().hasNext()) {
+ nextValue = nextValueState.get().iterator().next();
+ }
+ }
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ // If we are not going to trigger failover or we have already triggered failover,
+ // run until finished.
+ sendRecordsUntil(isFinite ? (nextValue + numberOfRecords) : Integer.MAX_VALUE, ctx);
+
+ // Wait the last checkpoint to commit all the pending records.
+ isWaitingCheckpointComplete = true;
+ CountDownLatch latch = LATCH_MAP.get(latchId);
+ latch.await();
+ }
+
+ private void sendRecordsUntil(int targetNumber, SourceContext<Integer> ctx) {
+ while (!isCanceled && nextValue < targetNumber) {
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(nextValue++);
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ nextValueState.update(Collections.singletonList(nextValue));
+ sendCountMap.consumeSync(
+ m -> m.put(getRuntimeContext().getIndexOfThisSubtask(), nextValue));
+
+ if (isWaitingCheckpointComplete) {
+ snapshottedAfterAllRecordsOutput = true;
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (!isFinite || (isWaitingCheckpointComplete && snapshottedAfterAllRecordsOutput)) {
+ CountDownLatch latch = LATCH_MAP.get(latchId);
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isCanceled = true;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
index 6c424ec..50f8107 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkITBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.util.TestLogger;
@@ -94,7 +95,8 @@ public abstract class FileSinkITBase extends TestLogger {
.build();
}
- private static class PartSizeAndCheckpointRollingPolicy
+ /** The testing {@link RollingPolicy} based on maximum file size. */
+ protected static class PartSizeAndCheckpointRollingPolicy
extends CheckpointRollingPolicy<Integer, String> {
private final long maxPartSize;
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
new file mode 100644
index 0000000..227d49a
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingCompactingFileSinkITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
+import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntDecoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Rule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Tests the compaction of the {@link FileSink} in STREAMING mode. */
+@RunWith(Parameterized.class)
+public class StreamingCompactingFileSinkITCase extends StreamingExecutionFileSinkITCase {
+
+ private static final int PARALLELISM = 4;
+
+ @Rule
+ public final MiniClusterWithClientResource miniClusterResource =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
+ @Override
+ protected FileSink<Integer> createFileSink(String path) {
+ return FileSink.forRowFormat(new Path(path), new IntegerFileSinkTestDataUtils.IntEncoder())
+ .withBucketAssigner(
+ new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(NUM_BUCKETS))
+ .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024))
+ .enableCompact(createFileCompactStrategy(), createFileCompactor())
+ .build();
+ }
+
+ private static FileCompactor createFileCompactor() {
+ return new RecordWiseFileCompactor<>(new DecoderBasedReader.Factory<>(IntDecoder::new));
+ }
+
+ private static FileCompactStrategy createFileCompactStrategy() {
+ return FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10000).build();
+ }
+}