You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/07/25 15:06:11 UTC

[flink] branch master updated: [FLINK-13397] Remove RollingSink

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd6508f  [FLINK-13397] Remove RollingSink
dd6508f is described below

commit dd6508f9f6e88947329897a3259648f3d3eee38f
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Wed Jul 24 11:13:14 2019 +0200

    [FLINK-13397] Remove RollingSink
---
 docs/dev/connectors/filesystem_sink.md             |   4 +-
 docs/dev/connectors/filesystem_sink.zh.md          |   4 +-
 docs/dev/connectors/guarantees.md                  |   2 +-
 docs/dev/connectors/guarantees.zh.md               |   2 +-
 docs/fig/program_dataflow.svg                      |   2 +-
 .../flink/streaming/connectors/fs/Bucketer.java    |  56 --
 .../streaming/connectors/fs/DateTimeBucketer.java  | 126 ---
 .../connectors/fs/NonRollingBucketer.java          |  49 -
 .../flink/streaming/connectors/fs/RollingSink.java | 918 -------------------
 .../connectors/fs/bucketing/BucketingSink.java     |  15 -
 .../fs/RollingSinkFaultToleranceITCase.java        | 301 -------
 .../streaming/connectors/fs/RollingSinkITCase.java | 981 ---------------------
 .../connectors/fs/RollingSinkSecuredITCase.java    | 253 ------
 13 files changed, 7 insertions(+), 2706 deletions(-)

diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index f9a828d..7e43aeb 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -1,6 +1,6 @@
 ---
-title: "HDFS Connector"
-nav-title: Rolling File Sink
+title: "Hadoop FileSystem Connector"
+nav-title: Hadoop FileSystem
 nav-parent_id: connectors
 nav-pos: 5
 ---
diff --git a/docs/dev/connectors/filesystem_sink.zh.md b/docs/dev/connectors/filesystem_sink.zh.md
index 54b0c64..9f56272 100644
--- a/docs/dev/connectors/filesystem_sink.zh.md
+++ b/docs/dev/connectors/filesystem_sink.zh.md
@@ -1,6 +1,6 @@
 ---
-title: "HDFS 连接器"
-nav-title: Rolling File Sink
+title: "Hadoop FileSystem 连接器"
+nav-title: Hadoop FileSystem
 nav-parent_id: connectors
 nav-pos: 5
 ---
diff --git a/docs/dev/connectors/guarantees.md b/docs/dev/connectors/guarantees.md
index 56eac12..cb5bffe 100644
--- a/docs/dev/connectors/guarantees.md
+++ b/docs/dev/connectors/guarantees.md
@@ -98,7 +98,7 @@ state updates) of Flink coupled with bundled sinks:
   </thead>
   <tbody>
     <tr>
-        <td>HDFS rolling sink</td>
+        <td>HDFS BucketingSink</td>
         <td>exactly once</td>
         <td>Implementation depends on Hadoop version</td>
     </tr>
diff --git a/docs/dev/connectors/guarantees.zh.md b/docs/dev/connectors/guarantees.zh.md
index 8d2e568..f5c1aec 100644
--- a/docs/dev/connectors/guarantees.zh.md
+++ b/docs/dev/connectors/guarantees.zh.md
@@ -93,7 +93,7 @@ state updates) of Flink coupled with bundled sinks:
   </thead>
   <tbody>
     <tr>
-        <td>HDFS rolling sink</td>
+        <td>HDFS BucketingSink</td>
         <td>exactly once</td>
         <td>Implementation depends on Hadoop version</td>
     </tr>
diff --git a/docs/fig/program_dataflow.svg b/docs/fig/program_dataflow.svg
index 7c1ec8d..67ce157 100644
--- a/docs/fig/program_dataflow.svg
+++ b/docs/fig/program_dataflow.svg
@@ -336,7 +336,7 @@ under the License.
          y="202.80663"
          id="text3085"
          xml:space="preserve"
-         style="font-size:13.80343914px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Courier New">RollingSink</text>
+         style="font-size:13.80343914px;font-style:normal;font-weight:normal;text-align:start;text-anchor:start;fill:#000000;font-family:Courier New">BucketingSink</text>
       <text
          x="265.36435"
          y="202.80663"
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
deleted file mode 100644
index 9caf628..0000000
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Bucketer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.Serializable;
-
-/**
- * A bucketer is used with a {@link RollingSink}
- * to put emitted elements into rolling files.
- *
- *
- * <p>The {@code RollingSink} has one active bucket that it is writing to at a time. Whenever
- * a new element arrives it will ask the {@code Bucketer} if a new bucket should be started and
- * the old one closed. The {@code Bucketer} can, for example, decide to start new buckets
- * based on system time.
- *
- * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.Bucketer} instead.
- */
-@Deprecated
-public interface Bucketer extends Serializable {
-
-	/**
-	 * Returns {@code true} when a new bucket should be started.
-	 *
-	 * @param currentBucketPath The bucket {@code Path} that is currently being used.
-	 */
-	boolean shouldStartNewBucket(Path basePath, Path currentBucketPath);
-
-	/**
-	 * Returns the {@link Path} of a new bucket file.
-	 *
-	 * @param basePath The base path containing all the buckets.
-	 *
-	 * @return The complete new {@code Path} of the new bucket. This should include the {@code basePath}
-	 *      and also the {@code subtaskIndex} tp avoid clashes with parallel sinks.
-	 */
-	Path getNextBucketPath(Path basePath);
-}
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
deleted file mode 100644
index 96c743b..0000000
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/DateTimeBucketer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-/**
- * A {@link Bucketer} that assigns to buckets based on current system time.
- *
- *
- * <p>The {@code DateTimeBucketer} will create directories of the following form:
- * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
- * that was specified as a base path when creating the
- * {@link RollingSink}. The {@code dateTimePath}
- * is determined based on the current system time and the user provided format string.
- *
- *
- * <p>{@link SimpleDateFormat} is used to derive a date string from the current system time and
- * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
- * files will have a granularity of hours.
- *
- *
- *
- * <p>Example:
- *
- * <pre>{@code
- *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
- * }</pre>
- *
- * <p>This will create for example the following bucket path:
- * {@code /base/1976-12-31--14/}
- *
- * @deprecated use {@link org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer} instead.
- */
-@Deprecated
-public class DateTimeBucketer implements Bucketer {
-
-	private static final Logger LOG = LoggerFactory.getLogger(DateTimeBucketer.class);
-
-	private static final long serialVersionUID = 1L;
-
-	private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
-
-	// We have this so that we can manually set it for tests.
-	private static Clock clock = new SystemClock();
-
-	private final String formatString;
-
-	private transient SimpleDateFormat dateFormatter;
-
-	/**
-	 * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
-	 */
-	public DateTimeBucketer() {
-		this(DEFAULT_FORMAT_STRING);
-	}
-
-	/**
-	 * Creates a new {@code DateTimeBucketer} with the given date/time format string.
-	 *
-	 * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
-	 *                     the bucket path.
-	 */
-	public DateTimeBucketer(String formatString) {
-		this.formatString = formatString;
-
-		this.dateFormatter = new SimpleDateFormat(formatString);
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-
-		this.dateFormatter = new SimpleDateFormat(formatString);
-	}
-
-	@Override
-	public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
-		String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
-		return !(new Path(basePath, newDateTimeString).equals(currentBucketPath));
-	}
-
-	@Override
-	public Path getNextBucketPath(Path basePath) {
-		String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
-		return new Path(basePath + "/" + newDateTimeString);
-	}
-
-	@Override
-	public String toString() {
-		return "DateTimeBucketer{" +
-				"formatString='" + formatString + '\'' +
-				'}';
-	}
-
-	/**
-	 * This sets the internal {@link Clock} implementation. This method should only be used for testing
-	 *
-	 * @param newClock The new clock to set.
-	 */
-	public static void setClock(Clock newClock) {
-		clock = newClock;
-	}
-}
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
deleted file mode 100644
index a03bcb5..0000000
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/NonRollingBucketer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * A {@link Bucketer} that does not perform any
- * rolling of files. All files are written to the base path.
- *
- * @deprecated use {@link BasePathBucketer} instead.
- */
-@Deprecated
-public class NonRollingBucketer implements Bucketer {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public boolean shouldStartNewBucket(Path basePath, Path currentBucketPath) {
-		return false;
-	}
-
-	@Override
-	public Path getNextBucketPath(Path basePath) {
-		return basePath;
-	}
-
-	@Override
-	public String toString() {
-		return "NonRollingBucketer";
-	}
-}
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
deleted file mode 100644
index 831377e..0000000
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ /dev/null
@@ -1,918 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-/**
- * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This
- * is integrated with the checkpointing mechanism to provide exactly once semantics.
- *
- *
- * <p>When creating the sink a {@code basePath} must be specified. The base directory contains
- * one directory for every bucket. The bucket directories themselves contain several part files.
- * These contain the actual written data.
- *
- *
- * <p>The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
- * base directory. Whenever the {@code Bucketer} returns a different directory name than
- * it returned before the sink will close the current part files inside that bucket
- * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with
- * date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer}
- * using {@link #setBucketer(Bucketer)}. For example, use
- * {@link NonRollingBucketer} if you don't want to have
- * buckets but still write part files in a fault-tolerant way.
- *
- *
- * <p>The filenames of the part files contain the part prefix, the parallel subtask index of the sink
- * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is
- * {@code "part"} but this can be
- * configured using {@link #setPartPrefix(String)}. When a part file becomes bigger
- * than the batch size the current part file is closed, the part counter is increased and
- * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
- * using {@link #setBatchSize(long)}.
- *
- *
- * <p>Part files can be in one of three states: in-progress, pending or finished. The reason for this
- * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
- * and fault-tolerance. The part file that is currently being written to is in-progress. Once
- * a part file is closed for writing it becomes pending. When a checkpoint is successful the
- * currently pending files will be moved to finished. If a failure occurs the pending files
- * will be deleted to reset state to the last checkpoint. The data in in-progress files will
- * also have to be rolled back. If the {@code FileSystem} supports the {@code truncate} call
- * this will be used to reset the file back to a previous state. If not, a special file
- * with the same name as the part file and the suffix {@code ".valid-length"} will be written
- * that contains the length up to which the file contains valid data. When reading the file
- * it must be ensured that it is only read up to that point. The prefixes and suffixes for
- * the different file states and valid-length files can be configured, for example with
- * {@link #setPendingSuffix(String)}.
- *
- *
- * <p>Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
- * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work
- * in a non-fault-tolerant way but still provide output without prefixes and suffixes.
- *
- *
- * <p>The part files are written using an instance of {@link Writer}. By default
- * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
- * of {@code toString()} for every element. Separated by newlines. You can configure the writer
- * using {@link #setWriter(Writer)}. For example,
- * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write
- * Hadoop {@code SequenceFiles}.
- *
- *
- * <p>Example:
- *
- * <pre>{@code
- *     new RollingSink<Tuple2<IntWritable, Text>>(outPath)
- *         .setWriter(new SequenceFileWriter<IntWritable, Text>())
- *         .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
- * }</pre>
- *
- * <p>This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
- *
- * @see DateTimeBucketer
- * @see StringWriter
- * @see SequenceFileWriter
- *
- * @param <T> Type of the elements emitted by this sink
- *
- * @deprecated use {@link BucketingSink} instead.
- */
-@Deprecated
-public class RollingSink<T> extends RichSinkFunction<T>
-		implements InputTypeConfigurable, CheckpointedFunction,
-					CheckpointListener {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(RollingSink.class);
-
-	// --------------------------------------------------------------------------------------------
-	//  User configuration values
-	// --------------------------------------------------------------------------------------------
-	// These are initialized with some defaults but are meant to be changeable by the user
-
-	/**
-	 * The default maximum size of part files (currently {@code 384 MB}).
-	 */
-	private static final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
-
-	/**
-	 * This is used for part files that we are writing to but which where not yet confirmed
-	 * by a checkpoint.
-	 */
-	private static final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
-
-	/**
-	 * See above, but for prefix.
-	 */
-	private static final String DEFAULT_IN_PROGRESS_PREFIX = "_";
-
-	/**
-	 * This is used for part files that we are not writing to but which are not yet confirmed by
-	 * checkpoint.
-	 */
-	private static final String DEFAULT_PENDING_SUFFIX = ".pending";
-
-	/**
-	 * See above, but for prefix.
-	 */
-	private static final String DEFAULT_PENDING_PREFIX = "_";
-
-	/**
-	 * When truncate() is not supported on the used FileSystem we instead write a
-	 * file along the part file with this ending that contains the length up to which
-	 * the part file is valid.
-	 */
-	private static final String DEFAULT_VALID_SUFFIX = ".valid-length";
-
-	/**
-	 * See above, but for prefix.
-	 */
-	private static final String DEFAULT_VALID_PREFIX = "_";
-
-	/**
-	 * The default prefix for part files.
-	 */
-	private static final String DEFAULT_PART_REFIX = "part";
-
-	/**
-	 * The default timeout for asynchronous operations such as recoverLease and truncate. In
-	 * milliseconds.
-	 */
-	private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
-
-	/**
-	 * The base {@code Path} that stores all bucket directories.
-	 */
-	private final String basePath;
-
-	/**
-	 * The {@code Bucketer} that is used to determine the path of bucket directories.
-	 */
-	private Bucketer bucketer;
-
-	/**
-	 * We have a template and call duplicate() for each parallel writer in open() to get the actual
-	 * writer that is used for the part files.
-	 */
-	private Writer<T> writerTemplate;
-
-	/**
-	 * The actual writer that we user for writing the part files.
-	 */
-	private Writer<T> writer;
-
-	/**
-	 * Maximum size of part files. If files exceed this we close and create a new one in the same
-	 * bucket directory.
-	 */
-	private long batchSize;
-
-	// These are the actually configured prefixes/suffixes
-	private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
-	private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
-
-	private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
-	private String pendingPrefix = DEFAULT_PENDING_PREFIX;
-
-	private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
-	private String validLengthPrefix = DEFAULT_VALID_PREFIX;
-
-	private String partPrefix = DEFAULT_PART_REFIX;
-
-	/**
-	 * The timeout for asynchronous operations such as recoverLease and truncate. In
-	 * milliseconds.
-	 */
-	private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal fields (not configurable by user)
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * The part file that we are currently writing to.
-	 */
-	private transient Path currentPartPath;
-
-	/**
-	 * The bucket directory that we are currently filling.
-	 */
-	private transient Path currentBucketDirectory;
-
-	/**
-	 * For counting the part files inside a bucket directory. Part files follow the patter
-	 * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter.
-	 */
-	private transient int partCounter;
-
-	/**
-	 * Tracks if the writer is currently opened or closed.
-	 */
-	private transient boolean isWriterOpen;
-
-	/**
-	 * We use reflection to get the .truncate() method, this is only available starting with
-	 * Hadoop 2.7.
-	 */
-	private transient Method refTruncate;
-
-	/**
-	 * The state object that is handled by flink from snapshot/restore. In there we store the
-	 * current part file path, the valid length of the in-progress files and pending part files.
-	 */
-	private transient BucketState bucketState;
-
-	private transient ListState<BucketState> restoredBucketStates;
-
-	/**
-	 * User-defined FileSystem parameters.
-     */
-	@Nullable
-	private Configuration fsConfig;
-
-	/**
-	 * The FileSystem reference.
-	 */
-	private transient FileSystem fs;
-	/**
-	 * Creates a new {@code RollingSink} that writes files to the given base directory.
-	 *
-	 *
-	 * <p>This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
-	 * The maximum bucket size is set to 384 MB.
-	 *
-	 * @param basePath The directory to which to write the bucket files.
-	 */
-	public RollingSink(String basePath) {
-		this.basePath = basePath;
-		this.bucketer = new DateTimeBucketer();
-		this.batchSize = DEFAULT_BATCH_SIZE;
-		this.writerTemplate = new StringWriter<>();
-	}
-
-	/**
-	 * Specify a custom {@code Configuration} that will be used when creating
-	 * the {@link FileSystem} for writing.
-	 */
-	public RollingSink<T> setFSConfig(Configuration config) {
-		this.fsConfig = new Configuration();
-		fsConfig.addAll(config);
-		return this;
-	}
-
-	/**
-	 * Specify a custom {@code Configuration} that will be used when creating
-	 * the {@link FileSystem} for writing.
-	 */
-	public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
-		this.fsConfig = new Configuration();
-		for (Map.Entry<String, String> entry : config) {
-			fsConfig.setString(entry.getKey(), entry.getValue());
-		}
-		return this;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		if (this.writerTemplate instanceof InputTypeConfigurable) {
-			((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
-		}
-	}
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-		Preconditions.checkArgument(this.restoredBucketStates == null,
-			"The " + getClass().getSimpleName() + " has already been initialized.");
-
-		try {
-			initFileSystem();
-		} catch (IOException e) {
-			LOG.error("Error while creating FileSystem when initializing the state of the RollingSink.", e);
-			throw new RuntimeException("Error while creating FileSystem when initializing the state of the RollingSink.", e);
-		}
-
-		if (this.refTruncate == null) {
-			this.refTruncate = reflectTruncate(fs);
-		}
-
-		OperatorStateStore stateStore = context.getOperatorStateStore();
-		restoredBucketStates = stateStore.getSerializableListState("rolling-states");
-
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		if (context.isRestored()) {
-			LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
-
-			for (BucketState bucketState : restoredBucketStates.get()) {
-				handleRestoredBucketState(bucketState);
-			}
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState);
-			}
-		} else {
-			LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex);
-		}
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		partCounter = 0;
-
-		this.writer = writerTemplate.duplicate();
-
-		bucketState = new BucketState();
-	}
-
-	/**
-	 * Create a file system with the user-defined hdfs config.
-	 * @throws IOException
-	 */
-	private void initFileSystem() throws IOException {
-		if (fs == null) {
-			Path path = new Path(basePath);
-			fs = BucketingSink.createHadoopFileSystem(path, fsConfig);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		closeCurrentPartFile();
-	}
-
-	@Override
-	public void invoke(T value) throws Exception {
-		if (shouldRoll()) {
-			openNewPartFile();
-		}
-		writer.write(value);
-	}
-
-	/**
-	 * Determines whether we should change the bucket file we are writing to.
-	 *
-	 *
-	 * <p>This will roll if no file was created yet, if the file size is larger than the specified size
-	 * or if the {@code Bucketer} determines that we should roll.
-	 */
-	private boolean shouldRoll() throws IOException {
-		boolean shouldRoll = false;
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		if (!isWriterOpen) {
-			shouldRoll = true;
-			LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex);
-		}
-		if (bucketer.shouldStartNewBucket(new Path(basePath), currentBucketDirectory)) {
-			shouldRoll = true;
-			LOG.debug("RollingSink {} starting new bucket because {} said we should. ", subtaskIndex, bucketer);
-			// we will retrieve a new bucket base path in openNewPartFile so reset the part counter
-			partCounter = 0;
-		}
-		if (isWriterOpen) {
-			long writePosition = writer.getPos();
-			if (isWriterOpen && writePosition > batchSize) {
-				shouldRoll = true;
-				LOG.debug(
-						"RollingSink {} starting new bucket because file position {} is above batch size {}.",
-						subtaskIndex,
-						writePosition,
-						batchSize);
-			}
-		}
-		return shouldRoll;
-	}
-
-	/**
-	 * Opens a new part file.
-	 *
-	 *
-	 * <p>This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
-	 */
-	private void openNewPartFile() throws Exception {
-		closeCurrentPartFile();
-
-		Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
-
-		if (!newBucketDirectory.equals(currentBucketDirectory)) {
-			currentBucketDirectory = newBucketDirectory;
-			try {
-				if (fs.mkdirs(currentBucketDirectory)) {
-					LOG.debug("Created new bucket directory: {}", currentBucketDirectory);
-				}
-			} catch (IOException e) {
-				throw new RuntimeException("Could not create base path for new rolling file.", e);
-			}
-		}
-
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
-
-		// This should work since there is only one parallel subtask that tries names with
-		// our subtask id. Otherwise we would run into concurrency issues here.
-		while (fs.exists(currentPartPath) ||
-				fs.exists(getPendingPathFor(currentPartPath)) ||
-				fs.exists(getInProgressPathFor(currentPartPath))) {
-			partCounter++;
-			currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
-		}
-
-		// increase, so we don't have to check for this name next time
-		partCounter++;
-
-		LOG.debug("Next part path is {}", currentPartPath.toString());
-
-		Path inProgressPath = getInProgressPathFor(currentPartPath);
-		writer.open(fs, inProgressPath);
-		isWriterOpen = true;
-	}
-
-	private Path getPendingPathFor(Path path) {
-		return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix);
-	}
-
-	private Path getInProgressPathFor(Path path) {
-		return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix);
-	}
-
-	private Path getValidLengthPathFor(Path path) {
-		return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix);
-	}
-
-	/**
-	 * Closes the current part file.
-	 *
-	 *
-	 * <p>This moves the current in-progress part file to a pending file and adds it to the list
-	 * of pending files in our bucket state.
-	 */
-	private void closeCurrentPartFile() throws Exception {
-		if (isWriterOpen) {
-			writer.close();
-			isWriterOpen = false;
-		}
-
-		if (currentPartPath != null) {
-			Path inProgressPath = getInProgressPathFor(currentPartPath);
-			Path pendingPath = getPendingPathFor(currentPartPath);
-			fs.rename(inProgressPath, pendingPath);
-			LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, pendingPath);
-			this.bucketState.pendingFiles.add(currentPartPath.toString());
-		}
-	}
-
-	/**
-	 * Gets the truncate() call using reflection.
-	 *
-	 * <p><b>NOTE: </b>This code comes from Flume
-	 */
-	private Method reflectTruncate(FileSystem fs) {
-		Method m = null;
-		if (fs != null) {
-			Class<?> fsClass = fs.getClass();
-			try {
-				m = fsClass.getMethod("truncate", Path.class, long.class);
-			} catch (NoSuchMethodException ex) {
-				LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
-						" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-				return null;
-			}
-
-			// verify that truncate actually works
-			Path testPath = new Path(basePath, UUID.randomUUID().toString());
-			try {
-				try (FSDataOutputStream outputStream = fs.create(testPath)) {
-					outputStream.writeUTF("hello");
-				} catch (IOException e) {
-					LOG.error("Could not create file for checking if truncate works.", e);
-					throw new RuntimeException(
-							"Could not create file for checking if truncate works. " +
-									"You can disable support for truncate() completely via " +
-									"BucketingSink.setUseTruncate(false).", e);
-				}
-
-				try {
-					m.invoke(fs, testPath, 2);
-				} catch (IllegalAccessException | InvocationTargetException e) {
-					LOG.debug("Truncate is not supported.", e);
-					m = null;
-				}
-			} finally {
-				try {
-					fs.delete(testPath, false);
-				} catch (IOException e) {
-					LOG.error("Could not delete truncate test file.", e);
-					throw new RuntimeException("Could not delete truncate test file. " +
-							"You can disable support for truncate() completely via " +
-							"BucketingSink.setUseTruncate(false).", e);
-				}
-			}
-		}
-		return m;
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		synchronized (bucketState.pendingFilesPerCheckpoint) {
-			Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt =
-				bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
-
-			while (pendingCheckpointsIt.hasNext()) {
-				Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next();
-				Long pastCheckpointId = entry.getKey();
-
-				if (pastCheckpointId <= checkpointId) {
-					LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
-					// All the pending files are buckets that have been completed but are waiting to be renamed
-					// to their final name
-					for (String filename : entry.getValue()) {
-						Path finalPath = new Path(filename);
-						Path pendingPath = getPendingPathFor(finalPath);
-
-						fs.rename(pendingPath, finalPath);
-						LOG.debug("Moving pending file {} to final location after complete checkpoint {}.",
-								pendingPath, pastCheckpointId);
-					}
-					pendingCheckpointsIt.remove();
-				}
-			}
-		}
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
-		Preconditions.checkNotNull(restoredBucketStates,
-			"The " + getClass().getSimpleName() + " has not been properly initialized.");
-
-		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
-
-		if (isWriterOpen) {
-			bucketState.currentFile = currentPartPath.toString();
-			bucketState.currentFileValidLength = writer.flush();
-		}
-
-		synchronized (bucketState.pendingFilesPerCheckpoint) {
-			bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
-		}
-		bucketState.pendingFiles = new ArrayList<>();
-
-		restoredBucketStates.clear();
-		restoredBucketStates.add(bucketState);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState);
-		}
-	}
-
-	private void handleRestoredBucketState(BucketState bucketState) {
-		// we can clean all the pending files since they were renamed to
-		// final files after this checkpoint was successful
-		// (we re-start from the last **successful** checkpoint)
-		bucketState.pendingFiles.clear();
-
-		if (bucketState.currentFile != null) {
-			// We were writing to a file when the last checkpoint occurred. This file can either
-			// be still in-progress or became a pending file at some point after the checkpoint.
-			// Either way, we have to truncate it back to a valid state (or write a .valid-length
-			// file that specifies up to which length it is valid) and rename it to the final name
-			// before starting a new bucket file.
-			Path partPath = new Path(bucketState.currentFile);
-			try {
-				Path partPendingPath = getPendingPathFor(partPath);
-				Path partInProgressPath = getInProgressPathFor(partPath);
-
-				if (fs.exists(partPendingPath)) {
-					LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
-					// has been moved to pending in the mean time, rename to final location
-					fs.rename(partPendingPath, partPath);
-				} else if (fs.exists(partInProgressPath)) {
-					LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
-					// it was still in progress, rename to final path
-					fs.rename(partInProgressPath, partPath);
-				} else if (fs.exists(partPath)) {
-					LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
-				} else {
-					LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
-							"it was moved to final location by a previous snapshot restore", bucketState.currentFile);
-				}
-
-				if (this.refTruncate == null) {
-					this.refTruncate = reflectTruncate(fs);
-				}
-
-				// truncate it or write a ".valid-length" file to specify up to which point it is valid
-				if (refTruncate != null) {
-					LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
-					// some-one else might still hold the lease from a previous try, we are
-					// recovering, after all ...
-					if (fs instanceof DistributedFileSystem) {
-						DistributedFileSystem dfs = (DistributedFileSystem) fs;
-						LOG.debug("Trying to recover file lease {}", partPath);
-						dfs.recoverLease(partPath);
-						boolean isclosed = dfs.isFileClosed(partPath);
-						StopWatch sw = new StopWatch();
-						sw.start();
-						while (!isclosed) {
-							if (sw.getTime() > asyncTimeout) {
-								break;
-							}
-							try {
-								Thread.sleep(500);
-							} catch (InterruptedException e1) {
-								// ignore it
-							}
-							isclosed = dfs.isFileClosed(partPath);
-						}
-					}
-					Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
-					if (!truncated) {
-						LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
-
-						// we must wait for the asynchronous truncate operation to complete
-						StopWatch sw = new StopWatch();
-						sw.start();
-						long newLen = fs.getFileStatus(partPath).getLen();
-						while (newLen != bucketState.currentFileValidLength) {
-							if (sw.getTime() > asyncTimeout) {
-								break;
-							}
-							try {
-								Thread.sleep(500);
-							} catch (InterruptedException e1) {
-								// ignore it
-							}
-							newLen = fs.getFileStatus(partPath).getLen();
-						}
-						if (newLen != bucketState.currentFileValidLength) {
-							throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
-						}
-					}
-
-				} else {
-					Path validLengthFilePath = getValidLengthPathFor(partPath);
-					if (!fs.exists(validLengthFilePath) && fs.exists(partPath)) {
-						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
-						try (FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath)) {
-							lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-						}
-					}
-				}
-
-				// invalidate in the state object
-				bucketState.currentFile = null;
-				bucketState.currentFileValidLength = -1;
-				isWriterOpen = false;
-			} catch (IOException e) {
-				LOG.error("Error while restoring RollingSink state.", e);
-				throw new RuntimeException("Error while restoring RollingSink state.", e);
-			} catch (InvocationTargetException | IllegalAccessException e) {
-				LOG.error("Could not invoke truncate.", e);
-				throw new RuntimeException("Could not invoke truncate.", e);
-			}
-		}
-
-		// Move files that are confirmed by a checkpoint but did not get moved to final location
-		// because the checkpoint notification did not happen before a failure
-
-		Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
-		LOG.debug("Moving pending files to final location on restore.");
-		for (Long pastCheckpointId : pastCheckpointIds) {
-			// All the pending files are buckets that have been completed but are waiting to be renamed
-			// to their final name
-			for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
-				Path finalPath = new Path(filename);
-				Path pendingPath = getPendingPathFor(finalPath);
-
-				try {
-					if (fs.exists(pendingPath)) {
-						LOG.debug("(RESTORE) Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
-						fs.rename(pendingPath, finalPath);
-					}
-				} catch (IOException e) {
-					LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
-					throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
-				}
-			}
-		}
-
-		synchronized (bucketState.pendingFilesPerCheckpoint) {
-			bucketState.pendingFilesPerCheckpoint.clear();
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Setters for User configuration values
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Sets the maximum bucket size in bytes.
-	 *
-	 *
-	 * <p>When a bucket part file becomes larger than this size a new bucket part file is started and
-	 * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
-	 *
-	 * @param batchSize The bucket part file size in bytes.
-	 */
-	public RollingSink<T> setBatchSize(long batchSize) {
-		this.batchSize = batchSize;
-		return this;
-	}
-
-	/**
-	 * Sets the {@link Bucketer} to use for determining the bucket files to write to.
-	 *
-	 * @param bucketer The bucketer to use.
-	 */
-	public RollingSink<T> setBucketer(Bucketer bucketer) {
-		this.bucketer = bucketer;
-		return this;
-	}
-
-	/**
-	 * Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
-	 *
-	 * @param writer The {@code Writer} to use.
-	 */
-	public RollingSink<T> setWriter(Writer<T> writer) {
-		this.writerTemplate = writer;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
-	 */
-	public RollingSink<T> setInProgressSuffix(String inProgressSuffix) {
-		this.inProgressSuffix = inProgressSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of in-progress part files. The default is {@code "_"}.
-	 */
-	public RollingSink<T> setInProgressPrefix(String inProgressPrefix) {
-		this.inProgressPrefix = inProgressPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of pending part files. The default is {@code ".pending"}.
-	 */
-	public RollingSink<T> setPendingSuffix(String pendingSuffix) {
-		this.pendingSuffix = pendingSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of pending part files. The default is {@code "_"}.
-	 */
-	public RollingSink<T> setPendingPrefix(String pendingPrefix) {
-		this.pendingPrefix = pendingPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
-	 */
-	public RollingSink<T> setValidLengthSuffix(String validLengthSuffix) {
-		this.validLengthSuffix = validLengthSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of valid-length files. The default is {@code "_"}.
-	 */
-	public RollingSink<T> setValidLengthPrefix(String validLengthPrefix) {
-		this.validLengthPrefix = validLengthPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of part files.  The default is {@code "part"}.
-	 */
-	public RollingSink<T> setPartPrefix(String partPrefix) {
-		this.partPrefix = partPrefix;
-		return this;
-	}
-
-	/**
-	 * Disable cleanup of leftover in-progress/pending files when the sink is opened.
-	 *
-	 *
-	 * <p>This should only be disabled if using the sink without checkpoints, to not remove
-	 * the files already in the directory.
-	 *
-	 * @deprecated This option is deprecated and remains only for backwards compatibility.
-	 * We do not clean up lingering files anymore.
-	 */
-	@Deprecated
-	public RollingSink<T> disableCleanupOnOpen() {
-		return this;
-	}
-
-	/**
-	 * Sets the default timeout for asynchronous operations such as recoverLease and truncate.
-	 *
-	 * @param timeout The timeout, in milliseconds.
-	 */
-	public RollingSink<T> setAsyncTimeout(long timeout) {
-		this.asyncTimeout = timeout;
-		return this;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal Classes
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * This is used for keeping track of the current in-progress files and files that we mark
-	 * for moving from pending to final location after we get a checkpoint-complete notification.
-	 */
-	public static final class BucketState implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * The file that was in-progress when the last checkpoint occurred.
-		 */
-		public String currentFile;
-
-		/**
-		 * The valid length of the in-progress file at the time of the last checkpoint.
-		 */
-		public long currentFileValidLength = -1;
-
-		/**
-		 * Pending files that accumulated since the last checkpoint.
-		 */
-		public List<String> pendingFiles = new ArrayList<>();
-
-		/**
-		 * When doing a checkpoint we move the pending files since the last checkpoint to this map
-		 * with the id of the checkpoint. When we get the checkpoint-complete notification we move
-		 * pending files of completed checkpoints to their final location.
-		 */
-		public final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
-
-		@Override
-		public String toString() {
-			return
-				"In-progress=" + currentFile +
-				" validLength=" + currentFileValidLength +
-				" pendingForNextCheckpoint=" + pendingFiles +
-				" pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint;
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index b1d1786..a857a35 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.fs.Clock;
-import org.apache.flink.streaming.connectors.fs.RollingSink;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
@@ -780,20 +779,6 @@ public class BucketingSink<T>
 		}
 	}
 
-	private void handleRestoredRollingSinkState(RollingSink.BucketState restoredState) {
-		restoredState.pendingFiles.clear();
-
-		handlePendingInProgressFile(restoredState.currentFile, restoredState.currentFileValidLength);
-
-		// Now that we've restored the bucket to a valid state, reset the current file info
-		restoredState.currentFile = null;
-		restoredState.currentFileValidLength = -1;
-
-		handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint);
-
-		restoredState.pendingFilesPerCheckpoint.clear();
-	}
-
 	private void handlePendingInProgressFile(String file, long validLength) {
 		if (file != null) {
 
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
deleted file mode 100644
index daa8884..0000000
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.IN_PROGRESS_SUFFIX;
-import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.PENDING_SUFFIX;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
- *
- *
- * <p>This test only verifies the exactly once behaviour of the sink. Another test tests the
- * rolling behaviour.
- *
- * @deprecated should be removed with the {@link RollingSink}.
- */
-@Deprecated
-public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
-
-	private static final long NUM_STRINGS = 16_000;
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	private static MiniDFSCluster hdfsCluster;
-	private static org.apache.hadoop.fs.FileSystem dfs;
-
-	private static String outPath;
-
-	@Before
-	public void createHDFS() throws IOException {
-		Configuration conf = new Configuration();
-
-		File dataDir = tempFolder.newFolder();
-
-		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
-		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-		hdfsCluster = builder.build();
-
-		dfs = hdfsCluster.getFileSystem();
-
-		outPath = "hdfs://"
-				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
-				+ "/string-non-rolling-out";
-	}
-
-	@After
-	public void destroyHDFS() {
-		if (hdfsCluster != null) {
-			hdfsCluster.shutdown();
-		}
-	}
-
-	@Override
-	public void testProgram(StreamExecutionEnvironment env) {
-		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-		env.enableCheckpointing(20);
-		env.setParallelism(12);
-		env.disableOperatorChaining();
-
-		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
-
-		DataStream<String> mapped = stream
-				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
-
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-				.setBucketer(new NonRollingBucketer())
-				.setBatchSize(10000)
-				.setValidLengthPrefix("")
-				.setPendingPrefix("")
-				.setPendingSuffix(PENDING_SUFFIX)
-				.setInProgressSuffix(IN_PROGRESS_SUFFIX);
-
-		mapped.addSink(sink);
-
-	}
-
-	@Override
-	public void postSubmit() throws Exception {
-		// We read the files and verify that we have read all the strings. If a valid-length
-		// file exists we only read the file to that point. (This test should work with
-		// FileSystems that support truncate() and with others as well.)
-
-		Pattern messageRegex = Pattern.compile("message (\\d*)");
-
-		// Keep a set of the message IDs that we read. The size must equal the read count and
-		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
-		// elements twice.
-		Set<Integer> readNumbers = new HashSet<>();
-
-		HashSet<String> uniqMessagesRead = new HashSet<>();
-		HashSet<String> messagesInCommittedFiles = new HashSet<>();
-
-		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
-				outPath), true);
-
-		while (files.hasNext()) {
-			LocatedFileStatus file = files.next();
-
-			if (!file.getPath().toString().endsWith(".valid-length")) {
-				int validLength = (int) file.getLen();
-				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
-					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
-					String validLengthString = inStream.readUTF();
-					validLength = Integer.parseInt(validLengthString);
-					System.out.println("VALID LENGTH: " + validLength);
-				}
-				FSDataInputStream inStream = dfs.open(file.getPath());
-				byte[] buffer = new byte[validLength];
-				inStream.readFully(0, buffer, 0, validLength);
-				inStream.close();
-
-				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-
-				InputStreamReader inStreamReader = new InputStreamReader(bais);
-				BufferedReader br = new BufferedReader(inStreamReader);
-
-				String line = br.readLine();
-				while (line != null) {
-					Matcher matcher = messageRegex.matcher(line);
-					if (matcher.matches()) {
-						uniqMessagesRead.add(line);
-
-						// check that in the committed files there are no duplicates
-						if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX)) {
-							if (!messagesInCommittedFiles.add(line)) {
-								Assert.fail("Duplicate entry in committed bucket.");
-							}
-						}
-
-						int messageId = Integer.parseInt(matcher.group(1));
-						readNumbers.add(messageId);
-					} else {
-						Assert.fail("Read line does not match expected pattern.");
-					}
-					line = br.readLine();
-				}
-				br.close();
-				inStreamReader.close();
-				bais.close();
-			}
-		}
-
-		// Verify that we read all strings (at-least-once)
-		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
-
-		// Verify that we don't have duplicates (boom!, exactly-once)
-		Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size());
-	}
-
-	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		private static volatile boolean hasFailed = false;
-
-		private final long numElements;
-
-		private long failurePos;
-		private long count;
-
-		OnceFailingIdentityMapper(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
-			long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
-			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-			count = 0;
-		}
-
-		@Override
-		public String map(String value) throws Exception {
-			count++;
-			if (!hasFailed && count >= failurePos) {
-				hasFailed = true;
-				throw new Exception("Test Failure");
-			}
-
-			return value;
-		}
-	}
-
-	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
-			implements ListCheckpointed<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final long numElements;
-
-		private int index;
-
-		private volatile boolean isRunning = true;
-
-		StringGeneratingSourceFunction(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			final Object lockingObject = ctx.getCheckpointLock();
-
-			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
-
-			while (isRunning && index < numElements) {
-
-				Thread.sleep(1);
-				synchronized (lockingObject) {
-					ctx.collect("message " + index);
-					index += step;
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		private static String randomString(StringBuilder bld, Random rnd) {
-			final int len = rnd.nextInt(10) + 5;
-
-			for (int i = 0; i < len; i++) {
-				char next = (char) (rnd.nextInt(20000) + 33);
-				bld.append(next);
-			}
-
-			return bld.toString();
-		}
-
-		@Override
-		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.index);
-		}
-
-		@Override
-		public void restoreState(List<Integer> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			this.index = state.get(0);
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
deleted file mode 100644
index 41b329f..0000000
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ /dev/null
@@ -1,981 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.StringType;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTestUtils.checkLocalFs;
-
-/**
- * Tests for {@link RollingSink}. These
- * tests test the different output methods as well as the rolling feature using a manual clock
- * that increases time in lockstep with element computation using latches.
- *
- *
- * <p>This only tests the rolling behaviour of the sink. There is a separate ITCase that verifies
- * exactly once behaviour.
- *
- * @deprecated should be removed with the {@link RollingSink}.
- */
-@Deprecated
-public class RollingSinkITCase extends TestLogger {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkITCase.class);
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	protected static MiniClusterResource miniClusterResource;
-	protected static MiniDFSCluster hdfsCluster;
-	protected static org.apache.hadoop.fs.FileSystem dfs;
-	protected static String hdfsURI;
-	protected static Configuration conf = new Configuration();
-
-	protected static File dataDir;
-
-	private final int maxParallelism = 10;
-
-	@BeforeClass
-	public static void setup() throws Exception {
-
-		LOG.info("In RollingSinkITCase: Starting MiniDFSCluster ");
-
-		dataDir = tempFolder.newFolder();
-
-		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
-		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-		hdfsCluster = builder.build();
-
-		dfs = hdfsCluster.getFileSystem();
-
-		hdfsURI = "hdfs://"
-				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
-				+ "/";
-
-		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResourceConfiguration.Builder()
-				.setNumberTaskManagers(1)
-				.setNumberSlotsPerTaskManager(4)
-				.build());
-
-		miniClusterResource.before();
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		LOG.info("In RollingSinkITCase: tearing down MiniDFSCluster ");
-		hdfsCluster.shutdown();
-
-		if (miniClusterResource != null) {
-			miniClusterResource.after();
-		}
-	}
-
-	/**
-	 * This tests {@link StringWriter} with
-	 * non-rolling output.
-	 */
-	@Test
-	public void testNonRollingStringWriter() throws Exception {
-		final int numElements = 20;
-		final String outPath = hdfsURI + "/string-non-rolling-out";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
-				.broadcast()
-				.filter(new OddEvenFilter());
-
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-				.setBucketer(new NonRollingBucketer())
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		source
-				.map(new MapFunction<Tuple2<Integer, String>, String>() {
-					private static final long serialVersionUID = 1L;
-					@Override
-					public String map(Tuple2<Integer, String> value) throws Exception {
-						return value.f1;
-					}
-				})
-				.addSink(sink);
-
-		env.execute("RollingSink String Write Test");
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
-		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-		for (int i = 0; i < numElements; i += 2) {
-			String line = br.readLine();
-			Assert.assertEquals("message #" + i, line);
-		}
-
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-		br = new BufferedReader(new InputStreamReader(inStream));
-
-		for (int i = 1; i < numElements; i += 2) {
-			String line = br.readLine();
-			Assert.assertEquals("message #" + i, line);
-		}
-
-		inStream.close();
-	}
-
-	/**
-	 * This tests {@link SequenceFileWriter}
-	 * with non-rolling output and without compression.
-	 */
-	@Test
-	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {
-		final int numElements = 20;
-		final String outPath = hdfsURI + "/seq-no-comp-non-rolling-out";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
-				.broadcast()
-				.filter(new OddEvenFilter());
-
-		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
-				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
-			}
-		});
-
-		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
-				.setWriter(new SequenceFileWriter<IntWritable, Text>())
-				.setBucketer(new NonRollingBucketer())
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		mapped.addSink(sink);
-
-		env.execute("RollingSink String Write Test");
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
-		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
-				1000,
-				0,
-				100000,
-				new Configuration());
-
-		IntWritable intWritable = new IntWritable();
-		Text txt = new Text();
-
-		for (int i = 0; i < numElements; i += 2) {
-			reader.next(intWritable, txt);
-			Assert.assertEquals(i, intWritable.get());
-			Assert.assertEquals("message #" + i, txt.toString());
-		}
-
-		reader.close();
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-		reader = new SequenceFile.Reader(inStream,
-				1000,
-				0,
-				100000,
-				new Configuration());
-
-		for (int i = 1; i < numElements; i += 2) {
-			reader.next(intWritable, txt);
-			Assert.assertEquals(i, intWritable.get());
-			Assert.assertEquals("message #" + i, txt.toString());
-		}
-
-		reader.close();
-		inStream.close();
-	}
-
-	/**
-	 * This tests {@link SequenceFileWriter}
-	 * with non-rolling output but with compression.
-	 */
-	@Test
-	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {
-		final int numElements = 20;
-		final String outPath = hdfsURI + "/seq-non-rolling-out";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
-				.broadcast()
-				.filter(new OddEvenFilter());
-
-		DataStream<Tuple2<IntWritable, Text>> mapped =  source.map(new MapFunction<Tuple2<Integer, String>, Tuple2<IntWritable, Text>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Tuple2<IntWritable, Text> map(Tuple2<Integer, String> value) throws Exception {
-				return Tuple2.of(new IntWritable(value.f0), new Text(value.f1));
-			}
-		});
-
-		RollingSink<Tuple2<IntWritable, Text>> sink = new RollingSink<Tuple2<IntWritable, Text>>(outPath)
-				.setWriter(new SequenceFileWriter<IntWritable, Text>("Default", SequenceFile.CompressionType.BLOCK))
-				.setBucketer(new NonRollingBucketer())
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		mapped.addSink(sink);
-
-		env.execute("RollingSink String Write Test");
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
-		SequenceFile.Reader reader = new SequenceFile.Reader(inStream,
-				1000,
-				0,
-				100000,
-				new Configuration());
-
-		IntWritable intWritable = new IntWritable();
-		Text txt = new Text();
-
-		for (int i = 0; i < numElements; i += 2) {
-			reader.next(intWritable, txt);
-			Assert.assertEquals(i, intWritable.get());
-			Assert.assertEquals("message #" + i, txt.toString());
-		}
-
-		reader.close();
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-		reader = new SequenceFile.Reader(inStream,
-				1000,
-				0,
-				100000,
-				new Configuration());
-
-		for (int i = 1; i < numElements; i += 2) {
-			reader.next(intWritable, txt);
-			Assert.assertEquals(i, intWritable.get());
-			Assert.assertEquals("message #" + i, txt.toString());
-		}
-
-		reader.close();
-		inStream.close();
-	}
-
-	/**
-	 * This tests {@link AvroKeyValueSinkWriter}
-	 * with non-rolling output and without compression.
-	 */
-	@Test
-	public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {
-		final int numElements = 20;
-		final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
-				.broadcast()
-				.filter(new OddEvenFilter());
-
-		Map<String, String> properties = new HashMap<>();
-		Schema keySchema = Schema.create(Type.INT);
-		Schema valueSchema = Schema.create(Type.STRING);
-		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
-		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
-		RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath)
-				.setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties))
-				.setBucketer(new NonRollingBucketer())
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		source.addSink(sink);
-
-		env.execute("RollingSink Avro KeyValue Writer Test");
-
-		GenericData.setStringType(valueSchema, StringType.String);
-		Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema);
-		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
-		for (int i = 0; i < numElements; i += 2) {
-			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
-			int key = wrappedEntry.getKey().intValue();
-			Assert.assertEquals(i, key);
-			String value = wrappedEntry.getValue();
-			Assert.assertEquals("message #" + i, value);
-		}
-
-		dataFileStream.close();
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-		dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
-
-		for (int i = 1; i < numElements; i += 2) {
-			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
-			int key = wrappedEntry.getKey().intValue();
-			Assert.assertEquals(i, key);
-			String value = wrappedEntry.getValue();
-			Assert.assertEquals("message #" + i, value);
-		}
-
-		dataFileStream.close();
-		inStream.close();
-	}
-
-	/**
-	 * This tests {@link AvroKeyValueSinkWriter}
-	 * with non-rolling output and with compression.
-	 */
-	@Test
-	public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {
-		final int numElements = 20;
-		final String outPath = hdfsURI + "/avro-kv-no-comp-non-rolling-out";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
-				.broadcast()
-				.filter(new OddEvenFilter());
-
-		Map<String, String> properties = new HashMap<>();
-		Schema keySchema = Schema.create(Type.INT);
-		Schema valueSchema = Schema.create(Type.STRING);
-		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
-		properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
-		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true));
-		properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
-		RollingSink<Tuple2<Integer, String>> sink = new RollingSink<Tuple2<Integer, String>>(outPath)
-				.setWriter(new AvroKeyValueSinkWriter<Integer, String>(properties))
-				.setBucketer(new NonRollingBucketer())
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		source.addSink(sink);
-
-		env.execute("RollingSink Avro KeyValue Writer Test");
-
-		GenericData.setStringType(valueSchema, StringType.String);
-		Schema elementSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-		SpecificDatumReader<GenericRecord> elementReader = new SpecificDatumReader<GenericRecord>(elementSchema);
-		DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
-		for (int i = 0; i < numElements; i += 2) {
-			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
-			int key = wrappedEntry.getKey().intValue();
-			Assert.assertEquals(i, key);
-			String value = wrappedEntry.getValue();
-			Assert.assertEquals("message #" + i, value);
-		}
-
-		dataFileStream.close();
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-		dataFileStream = new DataFileStream<GenericRecord>(inStream, elementReader);
-
-		for (int i = 1; i < numElements; i += 2) {
-			AvroKeyValue<Integer, String> wrappedEntry = new AvroKeyValue<Integer, String>(dataFileStream.next());
-			int key = wrappedEntry.getKey().intValue();
-			Assert.assertEquals(i, key);
-			String value = wrappedEntry.getValue();
-			Assert.assertEquals("message #" + i, value);
-		}
-
-		dataFileStream.close();
-		inStream.close();
-	}
-
-	/**
-	 * This tests user defined hdfs configuration.
-	 * @throws Exception
-     */
-	@Test
-	public void testUserDefinedConfiguration() throws Exception {
-		final int numElements = 20;
-		final String outPath = hdfsURI + "/string-non-rolling-with-config";
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction(numElements))
-			.broadcast()
-			.filter(new OddEvenFilter());
-
-		Configuration conf = new Configuration();
-		conf.set("io.file.buffer.size", "40960");
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-			.setFSConfig(conf)
-			.setWriter(new StreamWriterWithConfigCheck<String>("io.file.buffer.size", "40960"))
-			.setBucketer(new NonRollingBucketer())
-			.setPartPrefix("part")
-			.setPendingPrefix("")
-			.setPendingSuffix("");
-
-		source
-			.map(new MapFunction<Tuple2<Integer, String>, String>() {
-				private static final long serialVersionUID = 1L;
-				@Override
-				public String map(Tuple2<Integer, String> value) throws Exception {
-					return value.f1;
-				}
-			})
-			.addSink(sink);
-
-		env.execute("RollingSink with configuration Test");
-
-		FSDataInputStream inStream = dfs.open(new Path(outPath + "/part-0-0"));
-
-		BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-		for (int i = 0; i < numElements; i += 2) {
-			String line = br.readLine();
-			Assert.assertEquals("message #" + i, line);
-		}
-
-		inStream.close();
-
-		inStream = dfs.open(new Path(outPath + "/part-1-0"));
-
-		br = new BufferedReader(new InputStreamReader(inStream));
-
-		for (int i = 1; i < numElements; i += 2) {
-			String line = br.readLine();
-			Assert.assertEquals("message #" + i, line);
-		}
-
-		inStream.close();
-	}
-
-	// we use this to synchronize the clock changes to elements being processed
-	private static final MultiShotLatch latch1 = new MultiShotLatch();
-	private static final MultiShotLatch latch2 = new MultiShotLatch();
-
-	/**
-	 * This uses {@link org.apache.flink.streaming.connectors.fs.DateTimeBucketer} to
-	 * produce rolling files. The clock of DateTimeBucketer is set to
-	 * {@link ModifyableClock} to keep the time in lockstep with the processing of elements using
-	 * latches.
-	 */
-	@Test
-	public void testDateTimeRollingStringWriter() throws Exception {
-		final int numElements = 20;
-		final String outPath = hdfsURI + "/rolling-out";
-		DateTimeBucketer.setClock(new ModifyableClock());
-		ModifyableClock.setCurrentTime(0);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
-
-		DataStream<Tuple2<Integer, String>> source = env.addSource(new WaitingTestSourceFunction(
-				numElements))
-				.broadcast();
-
-		// the parallel flatMap is chained to the sink, so when it has seen 5 elements it can
-		// fire the latch
-		DataStream<String> mapped = source
-				.flatMap(new RichFlatMapFunction<Tuple2<Integer, String>, String>() {
-					private static final long serialVersionUID = 1L;
-
-					int count = 0;
-					@Override
-					public void flatMap(Tuple2<Integer, String> value,
-							Collector<String> out) throws Exception {
-						out.collect(value.f1);
-						count++;
-						if (count >= 5) {
-							if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-								latch1.trigger();
-							} else {
-								latch2.trigger();
-							}
-							count = 0;
-						}
-					}
-
-				});
-
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-				.setBucketer(new DateTimeBucketer("ss"))
-				.setPartPrefix("part")
-				.setPendingPrefix("")
-				.setPendingSuffix("");
-
-		mapped.addSink(sink);
-
-		env.execute("RollingSink String Write Test");
-
-		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(outPath), true);
-
-		// we should have 8 rolling files, 4 time intervals and parallelism of 2
-		int numFiles = 0;
-		while (files.hasNext()) {
-			LocatedFileStatus file = files.next();
-			numFiles++;
-			if (file.getPath().toString().contains("rolling-out/00")) {
-				FSDataInputStream inStream = dfs.open(file.getPath());
-
-				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-				for (int i = 0; i < 5; i++) {
-					String line = br.readLine();
-					Assert.assertEquals("message #" + i, line);
-				}
-
-				inStream.close();
-			} else if (file.getPath().toString().contains("rolling-out/05")) {
-				FSDataInputStream inStream = dfs.open(file.getPath());
-
-				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-				for (int i = 5; i < 10; i++) {
-					String line = br.readLine();
-					Assert.assertEquals("message #" + i, line);
-				}
-
-				inStream.close();
-			} else if (file.getPath().toString().contains("rolling-out/10")) {
-				FSDataInputStream inStream = dfs.open(file.getPath());
-
-				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-				for (int i = 10; i < 15; i++) {
-					String line = br.readLine();
-					Assert.assertEquals("message #" + i, line);
-				}
-
-				inStream.close();
-			} else if (file.getPath().toString().contains("rolling-out/15")) {
-				FSDataInputStream inStream = dfs.open(file.getPath());
-
-				BufferedReader br = new BufferedReader(new InputStreamReader(inStream));
-
-				for (int i = 15; i < 20; i++) {
-					String line = br.readLine();
-					Assert.assertEquals("message #" + i, line);
-				}
-
-				inStream.close();
-			} else {
-				Assert.fail("File " + file + " does not match any expected roll pattern.");
-			}
-		}
-
-		Assert.assertEquals(8, numFiles);
-	}
-
-	private static final String PART_PREFIX = "part";
-	private static final String PENDING_SUFFIX = ".pending";
-	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-	private static final String VALID_LENGTH_SUFFIX = ".valid";
-
-	@Test
-	public void testBucketStateTransitions() throws Exception {
-		final File outDir = tempFolder.newFolder();
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0);
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.setProcessingTime(0L);
-
-		// we have a bucket size of 5 bytes, so each record will get its own bucket,
-		// i.e. the bucket should roll after every record.
-
-		testHarness.processElement(new StreamRecord<>("test1", 1L));
-		testHarness.processElement(new StreamRecord<>("test2", 1L));
-		checkLocalFs(outDir, 1, 1 , 0, 0);
-
-		testHarness.processElement(new StreamRecord<>("test3", 1L));
-		checkLocalFs(outDir, 1, 2, 0, 0);
-
-		testHarness.snapshot(0, 0);
-		checkLocalFs(outDir, 1, 2, 0, 0);
-
-		testHarness.notifyOfCompletedCheckpoint(0);
-		checkLocalFs(outDir, 1, 0, 2, 0);
-
-		OperatorSubtaskState snapshot = testHarness.snapshot(1, 0);
-
-		testHarness.close();
-		checkLocalFs(outDir, 0, 1, 2, 0);
-
-		testHarness = createRescalingTestSink(outDir, 1, 0);
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.open();
-		checkLocalFs(outDir, 0, 0, 3, 1);
-
-		snapshot = testHarness.snapshot(2, 0);
-
-		testHarness.processElement(new StreamRecord<>("test4", 10));
-		checkLocalFs(outDir, 1, 0, 3, 1);
-
-		testHarness = createRescalingTestSink(outDir, 1, 0);
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.open();
-
-		// the in-progress file remains as we do not clean up now
-		checkLocalFs(outDir, 1, 0, 3, 1);
-
-		testHarness.close();
-
-		// at close it is not moved to final because it is not part
-		// of the current task's state, it was just a not cleaned up leftover.
-		checkLocalFs(outDir, 1, 0, 3, 1);
-	}
-
-	@Test
-	public void testScalingDown() throws Exception {
-		final File outDir = tempFolder.newFolder();
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0);
-		testHarness1.setup();
-		testHarness1.open();
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1);
-		testHarness2.setup();
-		testHarness2.open();
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2);
-		testHarness3.setup();
-		testHarness3.open();
-
-		testHarness1.processElement(new StreamRecord<>("test1", 0L));
-		checkLocalFs(outDir, 1, 0, 0, 0);
-
-		testHarness2.processElement(new StreamRecord<>("test2", 0L));
-		testHarness2.processElement(new StreamRecord<>("test3", 0L));
-		testHarness2.processElement(new StreamRecord<>("test4", 0L));
-		testHarness2.processElement(new StreamRecord<>("test5", 0L));
-		testHarness2.processElement(new StreamRecord<>("test6", 0L));
-		checkLocalFs(outDir, 2, 4, 0, 0);
-
-		testHarness3.processElement(new StreamRecord<>("test7", 0L));
-		testHarness3.processElement(new StreamRecord<>("test8", 0L));
-		checkLocalFs(outDir, 3, 5, 0, 0);
-
-		// intentionally we snapshot them in a not ascending order so that the states are shuffled
-		OperatorSubtaskState mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
-			testHarness3.snapshot(0, 0),
-			testHarness1.snapshot(0, 0),
-			testHarness2.snapshot(0, 0)
-		);
-
-		// with the above state reshuffling, we expect testHarness4 to take the
-		// state of the previous testHarness3 and testHarness1 while testHarness5
-		// will take that of the previous testHarness1
-
-		OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
-			mergedSnapshot, maxParallelism, 3, 2, 0);
-
-		OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
-			mergedSnapshot, maxParallelism, 3, 2, 1);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness4 = createRescalingTestSink(outDir, 2, 0);
-		testHarness4.setup();
-		testHarness4.initializeState(initState1);
-		testHarness4.open();
-
-		// we do not have a length file for part-2-0 because bucket part-2-0
-		// was not "in-progress", but "pending" (its full content is valid).
-		checkLocalFs(outDir, 1, 4, 3, 2);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness5 = createRescalingTestSink(outDir, 2, 1);
-		testHarness5.setup();
-		testHarness5.initializeState(initState2);
-		testHarness5.open();
-
-		checkLocalFs(outDir, 0, 0, 8, 3);
-	}
-
-	@Test
-	public void testScalingUp() throws Exception {
-		final File outDir = tempFolder.newFolder();
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0);
-		testHarness1.setup();
-		testHarness1.open();
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0);
-		testHarness2.setup();
-		testHarness2.open();
-
-		testHarness1.processElement(new StreamRecord<>("test1", 0L));
-		testHarness1.processElement(new StreamRecord<>("test2", 0L));
-
-		checkLocalFs(outDir, 1, 1, 0, 0);
-
-		testHarness2.processElement(new StreamRecord<>("test3", 0L));
-		testHarness2.processElement(new StreamRecord<>("test4", 0L));
-		testHarness2.processElement(new StreamRecord<>("test5", 0L));
-
-		checkLocalFs(outDir, 2, 3, 0, 0);
-
-		// intentionally we snapshot them in the reverse order so that the states are shuffled
-		OperatorSubtaskState mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
-			testHarness2.snapshot(0, 0),
-			testHarness1.snapshot(0, 0)
-		);
-
-		OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
-			mergedSnapshot, maxParallelism, 2, 3, 0);
-
-		OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
-			mergedSnapshot, maxParallelism, 2, 3, 1);
-
-		OperatorSubtaskState initState3 = AbstractStreamOperatorTestHarness.repartitionOperatorState(
-			mergedSnapshot, maxParallelism, 2, 3, 2);
-
-		testHarness1 = createRescalingTestSink(outDir, 3, 0);
-		testHarness1.setup();
-		testHarness1.initializeState(initState1);
-		testHarness1.open();
-
-		checkLocalFs(outDir, 1, 1, 3, 1);
-
-		testHarness2 = createRescalingTestSink(outDir, 3, 1);
-		testHarness2.setup();
-		testHarness2.initializeState(initState2);
-		testHarness2.open();
-
-		checkLocalFs(outDir, 0, 0, 5, 2);
-
-		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2);
-		testHarness3.setup();
-		testHarness3.initializeState(initState3);
-		testHarness3.open();
-
-		checkLocalFs(outDir, 0, 0, 5, 2);
-
-		testHarness1.processElement(new StreamRecord<>("test6", 0));
-		testHarness2.processElement(new StreamRecord<>("test6", 0));
-		testHarness3.processElement(new StreamRecord<>("test6", 0));
-
-		// 3 for the different tasks
-		checkLocalFs(outDir, 3, 0, 5, 2);
-
-		testHarness1.snapshot(1, 0);
-		testHarness2.snapshot(1, 0);
-		testHarness3.snapshot(1, 0);
-
-		testHarness1.close();
-		testHarness2.close();
-		testHarness3.close();
-
-		checkLocalFs(outDir, 0, 3, 5, 2);
-	}
-
-	private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink(
-		File outDir, int totalParallelism, int taskIdx) throws Exception {
-
-		RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath())
-			.setWriter(new StringWriter<String>())
-			.setBatchSize(5)
-			.setPartPrefix(PART_PREFIX)
-			.setInProgressPrefix("")
-			.setPendingPrefix("")
-			.setValidLengthPrefix("")
-			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
-			.setPendingSuffix(PENDING_SUFFIX)
-			.setValidLengthSuffix(VALID_LENGTH_SUFFIX);
-
-		return createTestSink(sink, totalParallelism, taskIdx);
-	}
-
-	private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
-		RollingSink<T> sink, int totalParallelism, int taskIdx) throws Exception {
-		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), maxParallelism, totalParallelism, taskIdx);
-	}
-
-	private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		private final int numElements;
-
-		public TestSourceFunction(int numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
-			for (int i = 0; i < numElements && running; i++) {
-				ctx.collect(Tuple2.of(i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-
-	/**
-	 * This waits on the two multi-shot latches. The latches are triggered in a parallel
-	 * flatMap inside the test topology.
-	 */
-	private static class WaitingTestSourceFunction implements SourceFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		private volatile boolean running = true;
-
-		private final int numElements;
-
-		public WaitingTestSourceFunction(int numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
-			for (int i = 0; i < numElements && running; i++) {
-				if (i % 5 == 0 && i > 0) {
-					// update the clock after "five seconds", so we get 20 seconds in total
-					// with 5 elements in each time window
-					latch1.await();
-					latch2.await();
-					ModifyableClock.setCurrentTime(i * 1000);
-				}
-				ctx.collect(Tuple2.of(i, "message #" + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-
-	private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
-		private static final long serialVersionUID = 761584896826819477L;
-
-		private String key;
-		private String expect;
-		public StreamWriterWithConfigCheck(String key, String expect) {
-			this.key = key;
-			this.expect = expect;
-		}
-
-		@Override
-		public void open(FileSystem fs, Path path) throws IOException {
-			super.open(fs, path);
-			Assert.assertEquals(expect, fs.getConf().get(key));
-		}
-
-		@Override
-		public StreamWriterWithConfigCheck<T> duplicate() {
-			return new StreamWriterWithConfigCheck<>(key, expect);
-		}
-	}
-
-	private static class OddEvenFilter extends RichFilterFunction<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Tuple2<Integer, String> value) throws Exception {
-			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-				return value.f0 % 2 == 0;
-			} else {
-				return value.f0 % 2 == 1;
-			}
-		}
-	}
-
-	private static class ModifyableClock implements Clock {
-
-		private static volatile long currentTime = 0;
-
-		public static void setCurrentTime(long currentTime) {
-			ModifyableClock.currentTime = currentTime;
-		}
-
-		@Override
-		public long currentTimeMillis() {
-			return currentTime;
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
deleted file mode 100644
index 6ae55b6..0000000
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.runtime.security.SecurityConfiguration;
-import org.apache.flink.runtime.security.modules.HadoopModule;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SecureTestEnvironment;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.test.util.TestingSecurityContext;
-import org.apache.flink.util.NetUtils;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.http.HttpConfig;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.VersionInfo;
-import org.junit.AfterClass;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
-
-/**
- * Tests for running {@link RollingSinkSecuredITCase} which is an extension of {@link RollingSinkITCase} in secure environment
- * Note: only executed for Hadoop version > 3.x.x.
- */
-public class RollingSinkSecuredITCase extends RollingSinkITCase {
-
-	protected static final Logger LOG = LoggerFactory.getLogger(RollingSinkSecuredITCase.class);
-
-	/**
-	 * Skips all tests if the Hadoop version doesn't match.
-	 * We can't run this test class until HDFS-9213 is fixed which allows a secure DataNode
-	 * to bind to non-privileged ports for testing.
-	 * For now, we skip this test class until Hadoop version 3.x.x.
-	 */
-	private static void skipIfHadoopVersionIsNotAppropriate() {
-		// Skips all tests if the Hadoop version doesn't match
-		String hadoopVersionString = VersionInfo.getVersion();
-		String[] split = hadoopVersionString.split("\\.");
-		if (split.length != 3) {
-			throw new IllegalStateException("Hadoop version was not of format 'X.X.X': " + hadoopVersionString);
-		}
-		Assume.assumeTrue(
-			// check whether we're running Hadoop version >= 3.x.x
-			Integer.parseInt(split[0]) >= 3
-		);
-	}
-
-	/*
-	 * override super class static methods to avoid creating MiniDFS and MiniFlink with wrong configurations
-	 * and out-of-order sequence for secure cluster
-	 */
-	@BeforeClass
-	public static void setup() throws Exception {
-
-		skipIfHadoopVersionIsNotAppropriate();
-
-		LOG.info("starting secure cluster environment for testing");
-
-		dataDir = tempFolder.newFolder();
-
-		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
-
-		SecureTestEnvironment.prepare(tempFolder);
-
-		populateSecureConfigurations();
-
-		Configuration flinkConfig = new Configuration();
-		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
-				SecureTestEnvironment.getTestKeytab());
-		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL,
-				SecureTestEnvironment.getHadoopServicePrincipal());
-
-		SecurityConfiguration ctx =
-			new SecurityConfiguration(
-				flinkConfig,
-				Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, conf)));
-		try {
-			TestingSecurityContext.install(ctx, SecureTestEnvironment.getClientSecurityConfigurationMap());
-		} catch (Exception e) {
-			throw new RuntimeException("Exception occurred while setting up secure test context. Reason: {}", e);
-		}
-
-		File hdfsSiteXML = new File(dataDir.getAbsolutePath() + "/hdfs-site.xml");
-
-		FileWriter writer = new FileWriter(hdfsSiteXML);
-		conf.writeXml(writer);
-		writer.flush();
-		writer.close();
-
-		Map<String, String> map = new HashMap<String, String>(System.getenv());
-		map.put("HADOOP_CONF_DIR", hdfsSiteXML.getParentFile().getAbsolutePath());
-		TestBaseUtils.setEnv(map);
-
-		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-		builder.checkDataNodeAddrConfig(true);
-		builder.checkDataNodeHostConfig(true);
-		hdfsCluster = builder.build();
-
-		dfs = hdfsCluster.getFileSystem();
-
-		hdfsURI = "hdfs://"
-				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
-				+ "/";
-
-		Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled();
-
-		miniClusterResource = new MiniClusterResource(
-			new MiniClusterResourceConfiguration.Builder()
-				.setConfiguration(configuration)
-				.setNumberTaskManagers(1)
-				.setNumberSlotsPerTaskManager(4)
-				.build());
-
-		miniClusterResource.before();
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		LOG.info("tearing down secure cluster environment");
-
-		if (hdfsCluster != null) {
-			hdfsCluster.shutdown();
-		}
-
-		if (miniClusterResource != null) {
-			miniClusterResource.after();
-			miniClusterResource = null;
-		}
-
-		SecureTestEnvironment.cleanup();
-	}
-
-	private static void populateSecureConfigurations() {
-
-		String dataTransferProtection = "authentication";
-
-		SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
-		conf.set(DFS_NAMENODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
-		conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab());
-		conf.set(DFS_DATANODE_USER_NAME_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
-		conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, SecureTestEnvironment.getTestKeytab());
-		conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, SecureTestEnvironment.getHadoopServicePrincipal());
-
-		conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
-
-		conf.set("dfs.data.transfer.protection", dataTransferProtection);
-
-		conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTP_ONLY.name());
-
-		conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "false");
-
-		conf.setInt("dfs.datanode.socket.write.timeout", 0);
-
-		/*
-		 * We ae setting the port number to privileged port - see HDFS-9213
-		 * This requires the user to have root privilege to bind to the port
-		 * Use below command (ubuntu) to set privilege to java process for the
-		 * bind() to work if the java process is not running as root.
-		 * setcap 'cap_net_bind_service=+ep' /path/to/java
-		 */
-		conf.set(DFS_DATANODE_ADDRESS_KEY, "localhost:1002");
-		conf.set(DFS_DATANODE_HOST_NAME_KEY, "localhost");
-		conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:1003");
-	}
-
-	private static Configuration startSecureFlinkClusterWithRecoveryModeEnabled() {
-		try {
-			LOG.info("Starting Flink and ZK in secure mode");
-
-			dfs.mkdirs(new Path("/flink/checkpoints"));
-			dfs.mkdirs(new Path("/flink/recovery"));
-
-			final Configuration result = new Configuration();
-
-			result.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
-			result.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
-			result.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-			result.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
-			result.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
-			result.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
-			result.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
-
-			SecureTestEnvironment.populateFlinkSecureConfigurations(result);
-
-			return result;
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/* For secure cluster testing, it is enough to run only one test and override below test methods
-	 * to keep the overall build time minimal
-	 */
-	@Override
-	public void testNonRollingSequenceFileWithoutCompressionWriter() throws Exception {}
-
-	@Override
-	public void testNonRollingSequenceFileWithCompressionWriter() throws Exception {}
-
-	@Override
-	public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws Exception {}
-
-	@Override
-	public void testNonRollingAvroKeyValueWithCompressionWriter() throws Exception {}
-
-	@Override
-	public void testDateTimeRollingStringWriter() throws Exception {}
-
-}