You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:05 UTC

[15/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
deleted file mode 100644
index cf2c373..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ /dev/null
@@ -1,1082 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs.bucketing;
-
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.connectors.fs.Clock;
-import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
-import org.apache.flink.streaming.connectors.fs.StringWriter;
-import org.apache.flink.streaming.connectors.fs.Writer;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.Iterator;
-
-/**
- * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within
- * buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics.
- *
- * <p>
- * When creating the sink a {@code basePath} must be specified. The base directory contains
- * one directory for every bucket. The bucket directories themselves contain several part files,
- * one for each parallel subtask of the sink. These part files contain the actual output data.
- *
- * <p>
- * The sink uses a {@link Bucketer} to determine in which bucket directory each element should
- * be written to inside the base directory. The {@code Bucketer} can, for example, use time or
- * a property of the element to determine the bucket directory. The default {@code Bucketer} is a
- * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify
- * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. For example, use the
- * {@link BasePathBucketer} if you don't want to have buckets but still want to write part-files
- * in a fault-tolerant way.
- *
- * <p>
- * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
- * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
- * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default
- * the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}.
- * When a part file becomes bigger than the user-specified batch size the current part file is closed,
- * the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB},
- * this can be configured using {@link #setBatchSize(long)}.
- *
- * <p>
- * In some scenarios, the open buckets are required to change based on time. In these cases, the sink
- * needs to determine when a bucket has become inactive, in order to flush and close the part file.
- * To support this there are two configurable settings:
- * <ol>
- *     <li>the frequency to check for inactive buckets, configured by {@link #setInactiveBucketCheckInterval(long)},
- *     and</li>
- *     <li>the minimum amount of time a bucket has to not receive any data before it is considered inactive,
- *     configured by {@link #setInactiveBucketThreshold(long)}</li>
- * </ol>
- * Both of these parameters default to {@code 60,000 ms}, or {@code 1 min}.
- *
- * <p>
- * Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
- * The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
- * semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
- * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
- * pending files will be moved to {@code finished}.
- *
- * <p>
- * If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it
- * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending}
- * state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that
- * they do not contain data that arrived after the checkpoint from which we restore. If the {@code FileSystem} supports
- * the {@code truncate()} method this will be used to reset the file back to its previous state. If not, a special
- * file with the same name as the part file and the suffix {@code ".valid-length"} will be created that contains the
- * length up to which the file contains valid data. When reading the file, it must be ensured that it is only read up
- * to that point. The prefixes and suffixes for the different file states and valid-length files can be configured
- * using the adequate setter method, e.g. {@link #setPendingSuffix(String)}.
- *
- * <p>
- * <b>NOTE:</b>
- * <ol>
- *     <li>
- *         If checkpointing is not enabled the pending files will never be moved to the finished state. In that case,
- *         the pending suffix/prefix can be set to {@code ""} to make the sink work in a non-fault-tolerant way but
- *         still provide output without prefixes and suffixes.
- *     </li>
- *     <li>
- *         The part files are written using an instance of {@link Writer}. By default, a
- *         {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
- *         of {@code toString()} for every element, separated by newlines. You can configure the writer using  the
- *         {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter}
- *         can be used to write Hadoop {@code SequenceFiles}.
- *     </li>
- * </ol>
- *
- * <p>
- * Example:
- * <pre>{@code
- *     new BucketingSink<Tuple2<IntWritable, Text>>(outPath)
- *         .setWriter(new SequenceFileWriter<IntWritable, Text>())
- *         .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
- * }</pre>
- *
- * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
- *
- * @see DateTimeBucketer
- * @see StringWriter
- * @see SequenceFileWriter
- *
- * @param <T> Type of the elements emitted by this sink
- */
-public class BucketingSink<T>
-		extends RichSinkFunction<T>
-		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
-
-	private static final long serialVersionUID = 1L;
-
-	private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
-
-	// --------------------------------------------------------------------------------------------
-	//  User configuration values
-	// --------------------------------------------------------------------------------------------
-	// These are initialized with some defaults but are meant to be changeable by the user
-
-	/**
-	 * The default maximum size of part files (currently {@code 384 MB}).
-	 */
-	private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
-
-	/**
-	 * The default time between checks for inactive buckets. By default, {60 sec}.
-	 */
-	private final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L;
-
-	/**
-	 * The default threshold (in {@code ms}) for marking a bucket as inactive and
-	 * closing its part files. By default, {60 sec}.
-	 */
-	private final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L;
-
-	/**
-	 * The suffix for {@code in-progress} part files. These are files we are
-	 * currently writing to, but which were not yet confirmed by a checkpoint.
-	 */
-	private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
-
-	/**
-	 * The prefix for {@code in-progress} part files. These are files we are
-	 * currently writing to, but which were not yet confirmed by a checkpoint.
-	 */
-	private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
-
-	/**
-	 * The suffix for {@code pending} part files. These are closed files that we are
-	 * not currently writing to (inactive or reached {@link #batchSize}), but which
-	 * were not yet confirmed by a checkpoint.
-	 */
-	private final String DEFAULT_PENDING_SUFFIX = ".pending";
-
-	/**
-	 * The prefix for {@code pending} part files. These are closed files that we are
-	 * not currently writing to (inactive or reached {@link #batchSize}), but which
-	 * were not yet confirmed by a checkpoint.
-	 */
-	private final String DEFAULT_PENDING_PREFIX = "_";
-
-	/**
-	 * When {@code truncate()} is not supported by the used {@link FileSystem}, we create
-	 * a file along the part file with this suffix that contains the length up to which
-	 * the part file is valid.
-	 */
-	private final String DEFAULT_VALID_SUFFIX = ".valid-length";
-
-	/**
-	 * When {@code truncate()} is not supported by the used {@link FileSystem}, we create
-	 * a file along the part file with this preffix that contains the length up to which
-	 * the part file is valid.
-	 */
-	private final String DEFAULT_VALID_PREFIX = "_";
-
-	/**
-	 * The default prefix for part files.
-	 */
-	private final String DEFAULT_PART_REFIX = "part";
-
-	/**
-	 * The default timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
-	 */
-	private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
-
-
-	/**
-	 * The base {@code Path} that stores all bucket directories.
-	 */
-	private final String basePath;
-
-	/**
-	 * The {@code Bucketer} that is used to determine the path of bucket directories.
-	 */
-	private Bucketer<T> bucketer;
-
-	/**
-	 * We have a template and call duplicate() for each parallel writer in open() to get the actual
-	 * writer that is used for the part files.
-	 */
-	private Writer<T> writerTemplate;
-
-	private long batchSize = DEFAULT_BATCH_SIZE;
-	private long inactiveBucketCheckInterval = DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS;
-	private long inactiveBucketThreshold = DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS;
-
-	// These are the actually configured prefixes/suffixes
-	private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
-	private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
-
-	private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
-	private String pendingPrefix = DEFAULT_PENDING_PREFIX;
-
-	private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
-	private String validLengthPrefix= DEFAULT_VALID_PREFIX;
-
-	private String partPrefix = DEFAULT_PART_REFIX;
-
-	/**
-	 * The timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}).
-	 */
-	private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal fields (not configurable by user)
-	// -------------------------------------------�-------------------------------------------------
-
-	/**
-	 * We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7
-	 */
-	private transient Method refTruncate;
-
-	/**
-	 * The state object that is handled by Flink from snapshot/restore. This contains state for
-	 * every open bucket: the current in-progress part file path, its valid length and the pending part files.
-	 */
-	private transient State<T> state;
-
-	private transient ListState<State<T>> restoredBucketStates;
-
-	/**
-	 * User-defined FileSystem parameters
-	 */
-	private Configuration fsConfig;
-
-	/**
-	 * The FileSystem reference.
-	 */
-	private transient FileSystem fs;
-
-	private transient Clock clock;
-
-	private transient ProcessingTimeService processingTimeService;
-
-	/**
-	 * Creates a new {@code BucketingSink} that writes files to the given base directory.
-	 *
-	 * <p>
-	 * This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer.
-	 * The maximum bucket size is set to 384 MB.
-	 *
-	 * @param basePath The directory to which to write the bucket files.
-	 */
-	public BucketingSink(String basePath) {
-		this.basePath = basePath;
-		this.bucketer = new DateTimeBucketer<>();
-		this.writerTemplate = new StringWriter<>();
-	}
-
-	/**
-	 * Specify a custom {@code Configuration} that will be used when creating
-	 * the {@link FileSystem} for writing.
-	 */
-	public BucketingSink<T> setFSConfig(Configuration config) {
-		this.fsConfig = new Configuration();
-		fsConfig.addAll(config);
-		return this;
-	}
-
-	/**
-	 * Specify a custom {@code Configuration} that will be used when creating
-	 * the {@link FileSystem} for writing.
-	 */
-	public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
-		this.fsConfig = new Configuration();
-		for(Map.Entry<String, String> entry : config) {
-			fsConfig.setString(entry.getKey(), entry.getValue());
-		}
-		return this;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		if (this.writerTemplate instanceof InputTypeConfigurable) {
-			((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
-		}
-	}
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-		Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized.");
-
-		initFileSystem();
-
-		if (this.refTruncate == null) {
-			this.refTruncate = reflectTruncate(fs);
-		}
-
-		OperatorStateStore stateStore = context.getOperatorStateStore();
-		restoredBucketStates = stateStore.getSerializableListState("bucket-states");
-
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		if (context.isRestored()) {
-			LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
-
-			for (State<T> recoveredState : restoredBucketStates.get()) {
-				handleRestoredBucketState(recoveredState);
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState);
-				}
-			}
-		} else {
-			LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
-		}
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		state = new State<>();
-
-		processingTimeService =
-				((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
-
-		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
-
-		processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
-
-		this.clock = new Clock() {
-			@Override
-			public long currentTimeMillis() {
-				return processingTimeService.getCurrentProcessingTime();
-			}
-		};
-	}
-
-	/**
-	 * Create a file system with the user-defined {@code HDFS} configuration.
-	 * @throws IOException
-	 */
-	private void initFileSystem() throws IOException {
-		if (fs != null) {
-			return;
-		}
-		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-		if (fsConfig != null) {
-			String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
-			hadoopConf.setBoolean(disableCacheName, true);
-			for (String key : fsConfig.keySet()) {
-				hadoopConf.set(key, fsConfig.getString(key, null));
-			}
-		}
-
-		fs = new Path(basePath).getFileSystem(hadoopConf);
-	}
-
-	@Override
-	public void close() throws Exception {
-		for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
-			closeCurrentPartFile(entry.getValue());
-		}
-	}
-
-	@Override
-	public void invoke(T value) throws Exception {
-		Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
-
-		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
-
-		BucketState<T> bucketState = state.getBucketState(bucketPath);
-		if (bucketState == null) {
-			bucketState = new BucketState<>(currentProcessingTime);
-			state.addBucketState(bucketPath, bucketState);
-		}
-
-		if (shouldRoll(bucketState)) {
-			openNewPartFile(bucketPath, bucketState);
-		}
-
-		bucketState.writer.write(value);
-		bucketState.lastWrittenToTime = currentProcessingTime;
-	}
-
-	/**
-	 * Returns {@code true} if the current {@code part-file} should be closed and a new should be created.
-	 * This happens if:
-	 * <ol>
-	 *     <li>no file is created yet for the task to write to, or</li>
-	 *     <li>the current file has reached the maximum bucket size.</li>
-	 * </ol>
-	 */
-	private boolean shouldRoll(BucketState<T> bucketState) throws IOException {
-		boolean shouldRoll = false;
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		if (!bucketState.isWriterOpen) {
-			shouldRoll = true;
-			LOG.debug("BucketingSink {} starting new bucket.", subtaskIndex);
-		} else {
-			long writePosition = bucketState.writer.getPos();
-			if (writePosition > batchSize) {
-				shouldRoll = true;
-				LOG.debug(
-					"BucketingSink {} starting new bucket because file position {} is above batch size {}.",
-					subtaskIndex,
-					writePosition,
-					batchSize);
-			}
-		}
-		return shouldRoll;
-	}
-
-	@Override
-	public void onProcessingTime(long timestamp) throws Exception {
-		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
-
-		checkForInactiveBuckets(currentProcessingTime);
-
-		processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
-	}
-
-	/**
-	 * Checks for inactive buckets, and closes them. Buckets are considered inactive if they have not been
-	 * written to for a period greater than {@code inactiveBucketThreshold} ms. This enables in-progress
-	 * files to be moved to the pending state and be finalised on the next checkpoint.
-	 */
-	private void checkForInactiveBuckets(long currentProcessingTime) throws Exception {
-
-		synchronized (state.bucketStates) {
-			for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
-				if (entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) {
-					LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
-						getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
-					closeCurrentPartFile(entry.getValue());
-				}
-			}
-		}
-	}
-
-	/**
-	 * Closes the current part file and opens a new one with a new bucket path, as returned by the
-	 * {@link Bucketer}. If the bucket is not new, then this will create a new file with the same path
-	 * as its predecessor, but with an increased rolling counter (see {@link BucketingSink}.
-	 */
-	private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception {
-		closeCurrentPartFile(bucketState);
-
-		if (!fs.exists(bucketPath)) {
-			try {
-				if (fs.mkdirs(bucketPath)) {
-					LOG.debug("Created new bucket directory: {}", bucketPath);
-				}
-			} catch (IOException e) {
-				throw new RuntimeException("Could not create new bucket path.", e);
-			}
-		}
-
-		// The following loop tries different partCounter values in ascending order until it reaches the minimum
-		// that is not yet used. This works since there is only one parallel subtask that tries names with this
-		// subtask id. Otherwise we would run into concurrency issues here. This is aligned with the way we now
-		// clean the base directory in case of rescaling.
-
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
-		while (fs.exists(partPath) ||
-				fs.exists(getPendingPathFor(partPath)) ||
-				fs.exists(getInProgressPathFor(partPath))) {
-			bucketState.partCounter++;
-			partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
-		}
-
-		// increase, so we don't have to check for this name next time
-		bucketState.partCounter++;
-
-		LOG.debug("Next part path is {}", partPath.toString());
-		bucketState.currentFile = partPath.toString();
-
-		Path inProgressPath = getInProgressPathFor(partPath);
-		if (bucketState.writer == null) {
-			bucketState.writer = writerTemplate.duplicate();
-		}
-
-		bucketState.writer.open(fs, inProgressPath);
-		bucketState.isWriterOpen = true;
-	}
-
-	/**
-	 * Closes the current part file and moves it from the in-progress state to the pending state.
-	 */
-	private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception {
-		if (bucketState.isWriterOpen) {
-			bucketState.writer.close();
-			bucketState.isWriterOpen = false;
-		}
-
-		if (bucketState.currentFile != null) {
-			Path currentPartPath = new Path(bucketState.currentFile);
-			Path inProgressPath = getInProgressPathFor(currentPartPath);
-			Path pendingPath = getPendingPathFor(currentPartPath);
-
-			fs.rename(inProgressPath, pendingPath);
-			LOG.debug("Moving in-progress bucket {} to pending file {}",
-				inProgressPath,
-				pendingPath);
-			bucketState.pendingFiles.add(currentPartPath.toString());
-			bucketState.currentFile = null;
-		}
-	}
-
-	/**
-	 * Gets the truncate() call using reflection.
-	 * <p>
-	 * <b>NOTE:</b> This code comes from Flume.
-	 */
-	private Method reflectTruncate(FileSystem fs) {
-		Method m = null;
-		if(fs != null) {
-			Class<?> fsClass = fs.getClass();
-			try {
-				m = fsClass.getMethod("truncate", Path.class, long.class);
-			} catch (NoSuchMethodException ex) {
-				LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
-					" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-				return null;
-			}
-
-			// verify that truncate actually works
-			FSDataOutputStream outputStream;
-			Path testPath = new Path(UUID.randomUUID().toString());
-			try {
-				outputStream = fs.create(testPath);
-				outputStream.writeUTF("hello");
-				outputStream.close();
-			} catch (IOException e) {
-				LOG.error("Could not create file for checking if truncate works.", e);
-				throw new RuntimeException("Could not create file for checking if truncate works.", e);
-			}
-
-			try {
-				m.invoke(fs, testPath, 2);
-			} catch (IllegalAccessException | InvocationTargetException e) {
-				LOG.debug("Truncate is not supported.", e);
-				m = null;
-			}
-
-			try {
-				fs.delete(testPath, false);
-			} catch (IOException e) {
-				LOG.error("Could not delete truncate test file.", e);
-				throw new RuntimeException("Could not delete truncate test file.", e);
-			}
-		}
-		return m;
-	}
-
-	private Path getPendingPathFor(Path path) {
-		return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix);
-	}
-
-	private Path getInProgressPathFor(Path path) {
-		return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix);
-	}
-
-	private Path getValidLengthPathFor(Path path) {
-		return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix);
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		synchronized (state.bucketStates) {
-
-			Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = state.bucketStates.entrySet().iterator();
-			while (bucketStatesIt.hasNext()) {
-				BucketState<T> bucketState = bucketStatesIt.next().getValue();
-				synchronized (bucketState.pendingFilesPerCheckpoint) {
-
-					Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt =
-						bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
-
-					while (pendingCheckpointsIt.hasNext()) {
-
-						Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next();
-						Long pastCheckpointId = entry.getKey();
-						List<String> pendingPaths = entry.getValue();
-
-						if (pastCheckpointId <= checkpointId) {
-							LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
-
-							for (String filename : pendingPaths) {
-								Path finalPath = new Path(filename);
-								Path pendingPath = getPendingPathFor(finalPath);
-
-								fs.rename(pendingPath, finalPath);
-								LOG.debug(
-									"Moving pending file {} to final location having completed checkpoint {}.",
-									pendingPath,
-									pastCheckpointId);
-							}
-							pendingCheckpointsIt.remove();
-						}
-					}
-
-					if (!bucketState.isWriterOpen &&
-						bucketState.pendingFiles.isEmpty() &&
-						bucketState.pendingFilesPerCheckpoint.isEmpty()) {
-
-						// We've dealt with all the pending files and the writer for this bucket is not currently open.
-						// Therefore this bucket is currently inactive and we can remove it from our state.
-						bucketStatesIt.remove();
-					}
-				}
-			}
-		}
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
-		Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
-
-		restoredBucketStates.clear();
-
-		synchronized (state.bucketStates) {
-			int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
-
-			for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
-				BucketState<T> bucketState = bucketStateEntry.getValue();
-
-				if (bucketState.isWriterOpen) {
-					bucketState.currentFileValidLength = bucketState.writer.flush();
-				}
-
-				synchronized (bucketState.pendingFilesPerCheckpoint) {
-					bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
-				}
-				bucketState.pendingFiles = new ArrayList<>();
-			}
-			restoredBucketStates.add(state);
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
-			}
-		}
-	}
-
-	private void handleRestoredBucketState(State<T> restoredState) {
-		Preconditions.checkNotNull(restoredState);
-
-		for (BucketState<T> bucketState : restoredState.bucketStates.values()) {
-
-			// we can clean all the pending files since they were renamed to
-			// final files after this checkpoint was successful
-			// (we re-start from the last **successful** checkpoint)
-			bucketState.pendingFiles.clear();
-
-			if (bucketState.currentFile != null) {
-
-				// We were writing to a file when the last checkpoint occurred. This file can either
-				// be still in-progress or became a pending file at some point after the checkpoint.
-				// Either way, we have to truncate it back to a valid state (or write a .valid-length
-				// file that specifies up to which length it is valid) and rename it to the final name
-				// before starting a new bucket file.
-
-				Path partPath = new Path(bucketState.currentFile);
-				try {
-					Path partPendingPath = getPendingPathFor(partPath);
-					Path partInProgressPath = getInProgressPathFor(partPath);
-
-					if (fs.exists(partPendingPath)) {
-						LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
-						// has been moved to pending in the mean time, rename to final location
-						fs.rename(partPendingPath, partPath);
-					} else if (fs.exists(partInProgressPath)) {
-						LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
-						// it was still in progress, rename to final path
-						fs.rename(partInProgressPath, partPath);
-					} else if (fs.exists(partPath)) {
-						LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
-					} else {
-						LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
-							"it was moved to final location by a previous snapshot restore", bucketState.currentFile);
-					}
-
-					// We use reflection to get the .truncate() method, this
-					// is only available starting with Hadoop 2.7
-					if (this.refTruncate == null) {
-						this.refTruncate = reflectTruncate(fs);
-					}
-
-					// truncate it or write a ".valid-length" file to specify up to which point it is valid
-					if (refTruncate != null) {
-						LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
-						// some-one else might still hold the lease from a previous try, we are
-						// recovering, after all ...
-						if (fs instanceof DistributedFileSystem) {
-							DistributedFileSystem dfs = (DistributedFileSystem) fs;
-							LOG.debug("Trying to recover file lease {}", partPath);
-							dfs.recoverLease(partPath);
-							boolean isclosed = dfs.isFileClosed(partPath);
-							StopWatch sw = new StopWatch();
-							sw.start();
-							while (!isclosed) {
-								if (sw.getTime() > asyncTimeout) {
-									break;
-								}
-								try {
-									Thread.sleep(500);
-								} catch (InterruptedException e1) {
-									// ignore it
-								}
-								isclosed = dfs.isFileClosed(partPath);
-							}
-						}
-						Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
-						if (!truncated) {
-							LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
-
-							// we must wait for the asynchronous truncate operation to complete
-							StopWatch sw = new StopWatch();
-							sw.start();
-							long newLen = fs.getFileStatus(partPath).getLen();
-							while (newLen != bucketState.currentFileValidLength) {
-								if (sw.getTime() > asyncTimeout) {
-									break;
-								}
-								try {
-									Thread.sleep(500);
-								} catch (InterruptedException e1) {
-									// ignore it
-								}
-								newLen = fs.getFileStatus(partPath).getLen();
-							}
-							if (newLen != bucketState.currentFileValidLength) {
-								throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
-							}
-						}
-					} else {
-						LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
-						Path validLengthFilePath = getValidLengthPathFor(partPath);
-						if (!fs.exists(validLengthFilePath)) {
-							FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-							lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-							lengthFileOut.close();
-						}
-					}
-
-					// Now that we've restored the bucket to a valid state, reset the current file info
-					bucketState.currentFile = null;
-					bucketState.currentFileValidLength = -1;
-					bucketState.isWriterOpen = false;
-				} catch (IOException e) {
-					LOG.error("Error while restoring BucketingSink state.", e);
-					throw new RuntimeException("Error while restoring BucketingSink state.", e);
-				} catch (InvocationTargetException | IllegalAccessException e) {
-					LOG.error("Could not invoke truncate.", e);
-					throw new RuntimeException("Could not invoke truncate.", e);
-				}
-			}
-
-			// Move files that are confirmed by a checkpoint but did not get moved to final location
-			// because the checkpoint notification did not happen before a failure
-
-			LOG.debug("Moving pending files to final location on restore.");
-
-			Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
-			for (Long pastCheckpointId : pastCheckpointIds) {
-				// All the pending files are buckets that have been completed but are waiting to be renamed
-				// to their final name
-				for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
-					Path finalPath = new Path(filename);
-					Path pendingPath = getPendingPathFor(finalPath);
-
-					try {
-						if (fs.exists(pendingPath)) {
-							LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
-							fs.rename(pendingPath, finalPath);
-						}
-					} catch (IOException e) {
-						LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
-						throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e);
-					}
-				}
-			}
-
-			synchronized (bucketState.pendingFilesPerCheckpoint) {
-				bucketState.pendingFilesPerCheckpoint.clear();
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Setters for User configuration values
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Sets the maximum bucket size in bytes.
-	 *
-	 * <p>
-	 * When a bucket part file becomes larger than this size a new bucket part file is started and
-	 * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
-	 *
-	 * @param batchSize The bucket part file size in bytes.
-	 */
-	public BucketingSink<T> setBatchSize(long batchSize) {
-		this.batchSize = batchSize;
-		return this;
-	}
-
-	/**
-	 * Sets the default time between checks for inactive buckets.
-	 *
-	 * @param interval The timeout, in milliseconds.
-	 */
-	public BucketingSink<T> setInactiveBucketCheckInterval(long interval) {
-		this.inactiveBucketCheckInterval = interval;
-		return this;
-	}
-
-	/**
-	 * Sets the default threshold for marking a bucket as inactive and closing its part files.
-	 * Buckets which haven't been written to for at least this period of time become inactive.
-	 *
-	 * @param threshold The timeout, in milliseconds.
-	 */
-	public BucketingSink<T> setInactiveBucketThreshold(long threshold) {
-		this.inactiveBucketThreshold = threshold;
-		return this;
-	}
-
-	/**
-	 * Sets the {@link Bucketer} to use for determining the bucket files to write to.
-	 *
-	 * @param bucketer The bucketer to use.
-	 */
-	public BucketingSink<T> setBucketer(Bucketer<T> bucketer) {
-		this.bucketer = bucketer;
-		return this;
-	}
-
-	/**
-	 * Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
-	 *
-	 * @param writer The {@code Writer} to use.
-	 */
-	public BucketingSink<T> setWriter(Writer<T> writer) {
-		this.writerTemplate = writer;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
-	 */
-	public BucketingSink<T> setInProgressSuffix(String inProgressSuffix) {
-		this.inProgressSuffix = inProgressSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of in-progress part files. The default is {@code "_"}.
-	 */
-	public BucketingSink<T> setInProgressPrefix(String inProgressPrefix) {
-		this.inProgressPrefix = inProgressPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of pending part files. The default is {@code ".pending"}.
-	 */
-	public BucketingSink<T> setPendingSuffix(String pendingSuffix) {
-		this.pendingSuffix = pendingSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of pending part files. The default is {@code "_"}.
-	 */
-	public BucketingSink<T> setPendingPrefix(String pendingPrefix) {
-		this.pendingPrefix = pendingPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
-	 */
-	public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix) {
-		this.validLengthSuffix = validLengthSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of valid-length files. The default is {@code "_"}.
-	 */
-	public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix) {
-		this.validLengthPrefix = validLengthPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of part files.  The default is {@code "part"}.
-	 */
-	public BucketingSink<T> setPartPrefix(String partPrefix) {
-		this.partPrefix = partPrefix;
-		return this;
-	}
-
-	/**
-	 * Disable cleanup of leftover in-progress/pending files when the sink is opened.
-	 *
-	 * <p>
-	 * This should only be disabled if using the sink without checkpoints, to not remove
-	 * the files already in the directory.
-	 *
-	 * @deprecated This option is deprecated and remains only for backwards compatibility.
-	 * We do not clean up lingering files anymore.
-	 */
-	@Deprecated
-	public BucketingSink<T> disableCleanupOnOpen() {
-		return this;
-	}
-
-	/**
-	 * Sets the default timeout for asynchronous operations such as recoverLease and truncate.
-	 *
-	 * @param timeout The timeout, in milliseconds.
-	 */
-	public BucketingSink<T> setAsyncTimeout(long timeout) {
-		this.asyncTimeout = timeout;
-		return this;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal Classes
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * This is used during snapshot/restore to keep track of in-progress buckets.
-	 * For each bucket, we maintain a state.
-	 */
-	static final class State<T> implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * For every bucket directory (key), we maintain a bucket state (value).
-		 */
-		final Map<String, BucketState<T>> bucketStates = new HashMap<>();
-
-		void addBucketState(Path bucketPath, BucketState<T> state) {
-			synchronized (bucketStates) {
-				bucketStates.put(bucketPath.toString(), state);
-			}
-		}
-
-		BucketState<T> getBucketState(Path bucketPath) {
-			synchronized (bucketStates) {
-				return bucketStates.get(bucketPath.toString());
-			}
-		}
-
-		@Override
-		public String toString() {
-			return bucketStates.toString();
-		}
-	}
-
-	/**
-	 * This is used for keeping track of the current in-progress buckets and files that we mark
-	 * for moving from pending to final location after we get a checkpoint-complete notification.
-	 */
-	static final class BucketState<T> implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * The file that was in-progress when the last checkpoint occurred.
-		 */
-		String currentFile;
-
-		/**
-		 * The valid length of the in-progress file at the time of the last checkpoint.
-		 */
-		long currentFileValidLength = -1;
-
-		/**
-		 * The time this bucket was last written to.
-		 */
-		long lastWrittenToTime;
-
-		/**
-		 * Pending files that accumulated since the last checkpoint.
-		 */
-		List<String> pendingFiles = new ArrayList<>();
-
-		/**
-		 * When doing a checkpoint we move the pending files since the last checkpoint to this map
-		 * with the id of the checkpoint. When we get the checkpoint-complete notification we move
-		 * pending files of completed checkpoints to their final location.
-		 */
-		final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
-
-		/**
-		 * For counting the part files inside a bucket directory. Part files follow the pattern
-		 * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter.
-		 */
-		private transient int partCounter;
-
-		/**
-		 * Tracks if the writer is currently opened or closed.
-		 */
-		private transient boolean isWriterOpen;
-
-		/**
-		 * The actual writer that we user for writing the part files.
-		 */
-		private transient Writer<T> writer;
-
-		@Override
-		public String toString() {
-			return
-				"In-progress=" + currentFile +
-					" validLength=" + currentFileValidLength +
-					" pendingForNextCheckpoint=" + pendingFiles +
-					" pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint +
-					" lastModified@" + lastWrittenToTime;
-		}
-
-		BucketState(long lastWrittenToTime) {
-			this.lastWrittenToTime = lastWrittenToTime;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
deleted file mode 100644
index b985e14..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs.bucketing;
-
-import org.apache.flink.streaming.connectors.fs.Clock;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-/**
- * A {@link Bucketer} that assigns to buckets based on current system time.
- *
- * <p>
- * The {@code DateTimeBucketer} will create directories of the following form:
- * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
- * that was specified as a base path when creating the
- * {@link BucketingSink}. The {@code dateTimePath}
- * is determined based on the current system time and the user provided format string.
- *
- * <p>
- * {@link SimpleDateFormat} is used to derive a date string from the current system time and
- * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
- * files will have a granularity of hours.
- *
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- *     Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH");
- * }</pre>
- *
- * This will create for example the following bucket path:
- * {@code /base/1976-12-31-14/}
- *
- */
-public class DateTimeBucketer<T> implements Bucketer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
-
-	private final String formatString;
-
-	private transient SimpleDateFormat dateFormatter;
-
-	/**
-	 * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}.
-	 */
-	public DateTimeBucketer() {
-		this(DEFAULT_FORMAT_STRING);
-	}
-
-	/**
-	 * Creates a new {@code DateTimeBucketer} with the given date/time format string.
-	 *
-	 * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
-	 *                     the bucket path.
-	 */
-	public DateTimeBucketer(String formatString) {
-		this.formatString = formatString;
-
-		this.dateFormatter = new SimpleDateFormat(formatString);
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-
-		this.dateFormatter = new SimpleDateFormat(formatString);
-	}
-
-	@Override
-	public Path getBucketPath(Clock clock, Path basePath, T element) {
-		String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis()));
-		return new Path(basePath + "/" + newDateTimeString);
-	}
-
-	@Override
-	public String toString() {
-		return "DateTimeBucketer{" +
-				"formatString='" + formatString + '\'' +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
deleted file mode 100644
index fe60d94..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
deleted file mode 100644
index 36c0d03..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
-import org.apache.flink.util.NetUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}.
- *
- * <p>
- * This test only verifies the exactly once behaviour of the sink. Another test tests the
- * rolling behaviour.
- *
- * @deprecated should be removed with the {@link RollingSink}.
- */
-@Deprecated
-public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase {
-
-	final long NUM_STRINGS = 16_000;
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	private static MiniDFSCluster hdfsCluster;
-	private static org.apache.hadoop.fs.FileSystem dfs;
-
-	private static String outPath;
-
-	private static final String PENDING_SUFFIX = ".pending";
-	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
-
-	@BeforeClass
-	public static void createHDFS() throws IOException {
-		Configuration conf = new Configuration();
-
-		File dataDir = tempFolder.newFolder();
-
-		conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
-		MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
-		hdfsCluster = builder.build();
-
-		dfs = hdfsCluster.getFileSystem();
-
-		outPath = "hdfs://"
-				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
-				+ "/string-non-rolling-out";
-	}
-
-	@AfterClass
-	public static void destroyHDFS() {
-		if (hdfsCluster != null) {
-			hdfsCluster.shutdown();
-		}
-	}
-
-	@Override
-	public void testProgram(StreamExecutionEnvironment env) {
-		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
-
-		int PARALLELISM = 12;
-
-		env.enableCheckpointing(20);
-		env.setParallelism(PARALLELISM);
-		env.disableOperatorChaining();
-
-		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain();
-
-		DataStream<String> mapped = stream
-				.map(new OnceFailingIdentityMapper(NUM_STRINGS));
-
-		RollingSink<String> sink = new RollingSink<String>(outPath)
-				.setBucketer(new NonRollingBucketer())
-				.setBatchSize(10000)
-				.setValidLengthPrefix("")
-				.setPendingPrefix("")
-				.setPendingSuffix(PENDING_SUFFIX)
-				.setInProgressSuffix(IN_PROGRESS_SUFFIX);
-
-		mapped.addSink(sink);
-
-	}
-
-	@Override
-	public void postSubmit() throws Exception {
-		// We read the files and verify that we have read all the strings. If a valid-length
-		// file exists we only read the file to that point. (This test should work with
-		// FileSystems that support truncate() and with others as well.)
-
-		Pattern messageRegex = Pattern.compile("message (\\d*)");
-
-		// Keep a set of the message IDs that we read. The size must equal the read count and
-		// the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some
-		// elements twice.
-		Set<Integer> readNumbers = Sets.newHashSet();
-
-		HashSet<String> uniqMessagesRead = new HashSet<>();
-		HashSet<String> messagesInCommittedFiles = new HashSet<>();
-
-		RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(
-				outPath), true);
-
-		while (files.hasNext()) {
-			LocatedFileStatus file = files.next();
-
-			if (!file.getPath().toString().endsWith(".valid-length")) {
-				int validLength = (int) file.getLen();
-				if (dfs.exists(file.getPath().suffix(".valid-length"))) {
-					FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length"));
-					String validLengthString = inStream.readUTF();
-					validLength = Integer.parseInt(validLengthString);
-					System.out.println("VALID LENGTH: " + validLength);
-				}
-				FSDataInputStream inStream = dfs.open(file.getPath());
-				byte[] buffer = new byte[validLength];
-				inStream.readFully(0, buffer, 0, validLength);
-				inStream.close();
-
-				ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-
-				InputStreamReader inStreamReader = new InputStreamReader(bais);
-				BufferedReader br = new BufferedReader(inStreamReader);
-
-				String line = br.readLine();
-				while (line != null) {
-					Matcher matcher = messageRegex.matcher(line);
-					if (matcher.matches()) {
-						uniqMessagesRead.add(line);
-
-						// check that in the committed files there are no duplicates
-						if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX)) {
-							if (!messagesInCommittedFiles.add(line)) {
-								Assert.fail("Duplicate entry in committed bucket.");
-							}
-						}
-
-						int messageId = Integer.parseInt(matcher.group(1));
-						readNumbers.add(messageId);
-					} else {
-						Assert.fail("Read line does not match expected pattern.");
-					}
-					line = br.readLine();
-				}
-				br.close();
-				inStreamReader.close();
-				bais.close();
-			}
-		}
-
-		// Verify that we read all strings (at-least-once)
-		Assert.assertEquals(NUM_STRINGS, readNumbers.size());
-
-		// Verify that we don't have duplicates (boom!, exactly-once)
-		Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size());
-	}
-
-	private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		private static volatile boolean hasFailed = false;
-
-		private final long numElements;
-
-		private long failurePos;
-		private long count;
-
-
-		OnceFailingIdentityMapper(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(org.apache.flink.configuration.Configuration parameters) throws IOException {
-			long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
-			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-			count = 0;
-		}
-
-		@Override
-		public String map(String value) throws Exception {
-			count++;
-			if (!hasFailed && count >= failurePos) {
-				hasFailed = true;
-				throw new Exception("Test Failure");
-			}
-
-			return value;
-		}
-	}
-
-	private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String>
-			implements CheckpointedAsynchronously<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final long numElements;
-
-		private int index;
-
-		private volatile boolean isRunning = true;
-
-
-		StringGeneratingSourceFunction(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			final Object lockingObject = ctx.getCheckpointLock();
-
-			final int step = getRuntimeContext().getNumberOfParallelSubtasks();
-
-			if (index == 0) {
-				index = getRuntimeContext().getIndexOfThisSubtask();
-			}
-
-			while (isRunning && index < numElements) {
-
-				Thread.sleep(1);
-				synchronized (lockingObject) {
-					ctx.collect("message " + index);
-					index += step;
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		private static String randomString(StringBuilder bld, Random rnd) {
-			final int len = rnd.nextInt(10) + 5;
-
-			for (int i = 0; i < len; i++) {
-				char next = (char) (rnd.nextInt(20000) + 33);
-				bld.append(next);
-			}
-
-			return bld.toString();
-		}
-
-		@Override
-		public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
-			return index;
-		}
-
-		@Override
-		public void restoreState(Integer state) {
-			index = state;
-		}
-	}
-}