You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:06 UTC

[16/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
deleted file mode 100644
index fc4a35e..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
+++ /dev/null
@@ -1,916 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-/**
- * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This
- * is integrated with the checkpointing mechanism to provide exactly once semantics.
- *
- * <p>
- * When creating the sink a {@code basePath} must be specified. The base directory contains
- * one directory for every bucket. The bucket directories themselves contain several part files.
- * These contain the actual written data.
- *
- * <p>
- * The sink uses a {@link Bucketer} to determine the name of bucket directories inside the
- * base directory. Whenever the {@code Bucketer} returns a different directory name than
- * it returned before the sink will close the current part files inside that bucket
- * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with
- * date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer}
- * using {@link #setBucketer(Bucketer)}. For example, use
- * {@link NonRollingBucketer} if you don't want to have
- * buckets but still write part files in a fault-tolerant way.
- *
- * <p>
- * The filenames of the part files contain the part prefix, the parallel subtask index of the sink
- * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is
- * {@code "part"} but this can be
- * configured using {@link #setPartPrefix(String)}. When a part file becomes bigger
- * than the batch size the current part file is closed, the part counter is increased and
- * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
- * using {@link #setBatchSize(long)}.
- *
- * <p>
- * Part files can be in one of three states: in-progress, pending or finished. The reason for this
- * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics
- * and fault-tolerance. The part file that is currently being written to is in-progress. Once
- * a part file is closed for writing it becomes pending. When a checkpoint is successful the
- * currently pending files will be moved to finished. If a failure occurs the pending files
- * will be deleted to reset state to the last checkpoint. The data in in-progress files will
- * also have to be rolled back. If the {@code FileSystem} supports the {@code truncate} call
- * this will be used to reset the file back to a previous state. If not, a special file
- * with the same name as the part file and the suffix {@code ".valid-length"} will be written
- * that contains the length up to which the file contains valid data. When reading the file
- * it must be ensured that it is only read up to that point. The prefixes and suffixes for
- * the different file states and valid-length files can be configured, for example with
- * {@link #setPendingSuffix(String)}.
- *
- * <p>
- * Note: If checkpointing is not enabled the pending files will never be moved to the finished state.
- * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work
- * in a non-fault-tolerant way but still provide output without prefixes and suffixes.
- *
- * <p>
- * The part files are written using an instance of {@link Writer}. By default
- * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result
- * of {@code toString()} for every element. Separated by newlines. You can configure the writer
- * using {@link #setWriter(Writer)}. For example,
- * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write
- * Hadoop {@code SequenceFiles}.
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- *     new RollingSink<Tuple2<IntWritable, Text>>(outPath)
- *         .setWriter(new SequenceFileWriter<IntWritable, Text>())
- *         .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")
- * }</pre>
- *
- * This will create a sink that writes to {@code SequenceFiles} and rolls every minute.
- *
- * @see DateTimeBucketer
- * @see StringWriter
- * @see SequenceFileWriter
- *
- * @param <T> Type of the elements emitted by this sink
- *
- * @deprecated use {@link BucketingSink} instead.
- */
-@Deprecated
-public class RollingSink<T> extends RichSinkFunction<T>
-		implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener {
-
-	private static final long serialVersionUID = 1L;
-
-	private static Logger LOG = LoggerFactory.getLogger(RollingSink.class);
-
-
-	// --------------------------------------------------------------------------------------------
-	//  User configuration values
-	// --------------------------------------------------------------------------------------------
-	// These are initialized with some defaults but are meant to be changeable by the user
-
-	/**
-	 * The default maximum size of part files (currently {@code 384 MB}).
-	 */
-	private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L;
-
-	/**
-	 * This is used for part files that we are writing to but which where not yet confirmed
-	 * by a checkpoint.
-	 */
-	private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress";
-
-	/**
-	 * See above, but for prefix
-	 */
-	private final String DEFAULT_IN_PROGRESS_PREFIX = "_";
-
-	/**
-	 * This is used for part files that we are not writing to but which are not yet confirmed by
-	 * checkpoint.
-	 */
-	private final String DEFAULT_PENDING_SUFFIX = ".pending";
-
-	/**
-	 * See above, but for prefix.
-	 */
-	private final String DEFAULT_PENDING_PREFIX = "_";
-
-	/**
-	 * When truncate() is not supported on the used FileSystem we instead write a
-	 * file along the part file with this ending that contains the length up to which
-	 * the part file is valid.
-	 */
-	private final String DEFAULT_VALID_SUFFIX = ".valid-length";
-
-	/**
-	 * See above, but for prefix.
-	 */
-	private final String DEFAULT_VALID_PREFIX = "_";
-
-	/**
-	 * The default prefix for part files.
-	 */
-	private final String DEFAULT_PART_REFIX = "part";
-
-	/**
-	 * The default timeout for asynchronous operations such as recoverLease and truncate. In
-	 * milliseconds.
-	 */
-	private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
-
-
-	/**
-	 * The base {@code Path} that stores all bucket directories.
-	 */
-	private final String basePath;
-
-	/**
-	 * The {@code Bucketer} that is used to determine the path of bucket directories.
-	 */
-	private Bucketer bucketer;
-
-	/**
-	 * We have a template and call duplicate() for each parallel writer in open() to get the actual
-	 * writer that is used for the part files.
-	 */
-	private Writer<T> writerTemplate;
-
-	/**
-	 * The actual writer that we user for writing the part files.
-	 */
-	private Writer<T> writer;
-
-	/**
-	 * Maximum size of part files. If files exceed this we close and create a new one in the same
-	 * bucket directory.
-	 */
-	private long batchSize;
-
-	// These are the actually configured prefixes/suffixes
-	private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
-	private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX;
-
-	private String pendingSuffix = DEFAULT_PENDING_SUFFIX;
-	private String pendingPrefix = DEFAULT_PENDING_PREFIX;
-
-	private String validLengthSuffix = DEFAULT_VALID_SUFFIX;
-	private String validLengthPrefix= DEFAULT_VALID_PREFIX;
-
-	private String partPrefix = DEFAULT_PART_REFIX;
-
-	/**
-	 * The timeout for asynchronous operations such as recoverLease and truncate. In
-	 * milliseconds.
-	 */
-	private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS;
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal fields (not configurable by user)
-	// --------------------------------------------------------------------------------------------
-
-
-	/**
-	 * The part file that we are currently writing to.
-	 */
-	private transient Path currentPartPath;
-
-	/**
-	 * The bucket directory that we are currently filling.
-	 */
-	private transient Path currentBucketDirectory;
-
-	/**
-	 * For counting the part files inside a bucket directory. Part files follow the patter
-	 * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter.
-	 */
-	private transient int partCounter;
-
-	/**
-	 * Tracks if the writer is currently opened or closed.
-	 */
-	private transient boolean isWriterOpen;
-
-	/**
-	 * We use reflection to get the .truncate() method, this is only available starting with
-	 * Hadoop 2.7
-	 */
-	private transient Method refTruncate;
-
-	/**
-	 * The state object that is handled by flink from snapshot/restore. In there we store the
-	 * current part file path, the valid length of the in-progress files and pending part files.
-	 */
-	private transient BucketState bucketState;
-
-	private transient ListState<BucketState> restoredBucketStates;
-
-	/**
-	 * User-defined FileSystem parameters.
-     */
-	private Configuration fsConfig;
-
-	/**
-	 * The FileSystem reference.
-	 */
-	private transient FileSystem fs;
-	/**
-	 * Creates a new {@code RollingSink} that writes files to the given base directory.
-	 *
-	 * <p>
-	 * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer.
-	 * The maximum bucket size is set to 384 MB.
-	 *
-	 * @param basePath The directory to which to write the bucket files.
-	 */
-	public RollingSink(String basePath) {
-		this.basePath = basePath;
-		this.bucketer = new DateTimeBucketer();
-		this.batchSize = DEFAULT_BATCH_SIZE;
-		this.writerTemplate = new StringWriter<>();
-	}
-
-	/**
-	 * Specify a custom {@code Configuration} that will be used when creating
-	 * the {@link FileSystem} for writing.
-	 */
-	public RollingSink<T> setFSConfig(Configuration config) {
-		this.fsConfig = new Configuration();
-		fsConfig.addAll(config);
-		return this;
-	}
-
-	/**
-	 * Specify a custom {@code Configuration} that will be used when creating
-	 * the {@link FileSystem} for writing.
-	 */
-	public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) {
-		this.fsConfig = new Configuration();
-		for(Map.Entry<String, String> entry : config) {
-			fsConfig.setString(entry.getKey(), entry.getValue());
-		}
-		return this;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		if (this.writerTemplate instanceof InputTypeConfigurable) {
-			((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig);
-		}
-	}
-
-	@Override
-	public void initializeState(FunctionInitializationContext context) throws Exception {
-		Preconditions.checkArgument(this.restoredBucketStates == null,
-			"The " + getClass().getSimpleName() + " has already been initialized.");
-
-		initFileSystem();
-
-		if (this.refTruncate == null) {
-			this.refTruncate = reflectTruncate(fs);
-		}
-
-		OperatorStateStore stateStore = context.getOperatorStateStore();
-		restoredBucketStates = stateStore.getSerializableListState("rolling-states");
-
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		if (context.isRestored()) {
-			LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex);
-
-			for (BucketState bucketState : restoredBucketStates.get()) {
-				handleRestoredBucketState(bucketState);
-			}
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState);
-			}
-		} else {
-			LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex);
-		}
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		partCounter = 0;
-
-		this.writer = writerTemplate.duplicate();
-
-		bucketState = new BucketState();
-	}
-
-	/**
-	 * Create a file system with the user-defined hdfs config
-	 * @throws IOException
-	 */
-	private void initFileSystem() throws IOException {
-		if (fs != null) {
-			return;
-		}
-		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-		if (fsConfig != null) {
-			String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme());
-			hadoopConf.setBoolean(disableCacheName, true);
-			for (String key : fsConfig.keySet()) {
-				hadoopConf.set(key, fsConfig.getString(key, null));
-			}
-		}
-
-		fs = new Path(basePath).getFileSystem(hadoopConf);
-	}
-
-	@Override
-	public void close() throws Exception {
-		closeCurrentPartFile();
-	}
-
-	@Override
-	public void invoke(T value) throws Exception {
-		if (shouldRoll()) {
-			openNewPartFile();
-		}
-		writer.write(value);
-	}
-
-	/**
-	 * Determines whether we should change the bucket file we are writing to.
-	 *
-	 * <p>
-	 * This will roll if no file was created yet, if the file size is larger than the specified size
-	 * or if the {@code Bucketer} determines that we should roll.
-	 */
-	private boolean shouldRoll() throws IOException {
-		boolean shouldRoll = false;
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		if (!isWriterOpen) {
-			shouldRoll = true;
-			LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex);
-		}
-		if (bucketer.shouldStartNewBucket(new Path(basePath), currentBucketDirectory)) {
-			shouldRoll = true;
-			LOG.debug("RollingSink {} starting new bucket because {} said we should. ", subtaskIndex, bucketer);
-			// we will retrieve a new bucket base path in openNewPartFile so reset the part counter
-			partCounter = 0;
-		}
-		if (isWriterOpen) {
-			long writePosition = writer.getPos();
-			if (isWriterOpen && writePosition > batchSize) {
-				shouldRoll = true;
-				LOG.debug(
-						"RollingSink {} starting new bucket because file position {} is above batch size {}.",
-						subtaskIndex,
-						writePosition,
-						batchSize);
-			}
-		}
-		return shouldRoll;
-	}
-
-	/**
-	 * Opens a new part file.
-	 *
-	 * <p>
-	 * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}.
-	 */
-	private void openNewPartFile() throws Exception {
-		closeCurrentPartFile();
-
-		Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath));
-
-		if (!newBucketDirectory.equals(currentBucketDirectory)) {
-			currentBucketDirectory = newBucketDirectory;
-			try {
-				if (fs.mkdirs(currentBucketDirectory)) {
-					LOG.debug("Created new bucket directory: {}", currentBucketDirectory);
-				}
-			} catch (IOException e) {
-				throw new RuntimeException("Could not create base path for new rolling file.", e);
-			}
-		}
-
-		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-		currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
-
-		// This should work since there is only one parallel subtask that tries names with
-		// our subtask id. Otherwise we would run into concurrency issues here.
-		while (fs.exists(currentPartPath) ||
-				fs.exists(getPendingPathFor(currentPartPath)) ||
-				fs.exists(getInProgressPathFor(currentPartPath))) {
-			partCounter++;
-			currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter);
-		}
-
-		// increase, so we don't have to check for this name next time
-		partCounter++;
-
-		LOG.debug("Next part path is {}", currentPartPath.toString());
-
-		Path inProgressPath = getInProgressPathFor(currentPartPath);
-		writer.open(fs, inProgressPath);
-		isWriterOpen = true;
-	}
-
-	private Path getPendingPathFor(Path path) {
-		return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix);
-	}
-
-	private Path getInProgressPathFor(Path path) {
-		return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix);
-	}
-
-	private Path getValidLengthPathFor(Path path) {
-		return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix);
-	}
-
-	/**
-	 * Closes the current part file.
-	 *
-	 * <p>
-	 * This moves the current in-progress part file to a pending file and adds it to the list
-	 * of pending files in our bucket state.
-	 */
-	private void closeCurrentPartFile() throws Exception {
-		if (isWriterOpen) {
-			writer.close();
-			isWriterOpen = false;
-		}
-
-		if (currentPartPath != null) {
-			Path inProgressPath = getInProgressPathFor(currentPartPath);
-			Path pendingPath = getPendingPathFor(currentPartPath);
-			fs.rename(inProgressPath, pendingPath);
-			LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, pendingPath);
-			this.bucketState.pendingFiles.add(currentPartPath.toString());
-		}
-	}
-
-	/**
-	 * Gets the truncate() call using reflection.
-	 * <p>
-	 * <b>NOTE: </b>This code comes from Flume
-	 */
-	private Method reflectTruncate(FileSystem fs) {
-		Method m = null;
-		if (fs != null) {
-			Class<?> fsClass = fs.getClass();
-			try {
-				m = fsClass.getMethod("truncate", Path.class, long.class);
-			} catch (NoSuchMethodException ex) {
-				LOG.debug("Truncate not found. Will write a file with suffix '{}' " +
-						" and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
-				return null;
-			}
-
-			// verify that truncate actually works
-			FSDataOutputStream outputStream;
-			Path testPath = new Path(UUID.randomUUID().toString());
-			try {
-				outputStream = fs.create(testPath);
-				outputStream.writeUTF("hello");
-				outputStream.close();
-			} catch (IOException e) {
-				LOG.error("Could not create file for checking if truncate works.", e);
-				throw new RuntimeException("Could not create file for checking if truncate works.", e);
-			}
-
-			try {
-				m.invoke(fs, testPath, 2);
-			} catch (IllegalAccessException | InvocationTargetException e) {
-				LOG.debug("Truncate is not supported.", e);
-				m = null;
-			}
-
-			try {
-				fs.delete(testPath, false);
-			} catch (IOException e) {
-				LOG.error("Could not delete truncate test file.", e);
-				throw new RuntimeException("Could not delete truncate test file.", e);
-			}
-		}
-		return m;
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		synchronized (bucketState.pendingFilesPerCheckpoint) {
-			Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt =
-				bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
-
-			while (pendingCheckpointsIt.hasNext()) {
-				Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next();
-				Long pastCheckpointId = entry.getKey();
-
-				if (pastCheckpointId <= checkpointId) {
-					LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId);
-					// All the pending files are buckets that have been completed but are waiting to be renamed
-					// to their final name
-					for (String filename : entry.getValue()) {
-						Path finalPath = new Path(filename);
-						Path pendingPath = getPendingPathFor(finalPath);
-
-						fs.rename(pendingPath, finalPath);
-						LOG.debug("Moving pending file {} to final location after complete checkpoint {}.",
-								pendingPath, pastCheckpointId);
-					}
-					pendingCheckpointsIt.remove();
-				}
-			}
-		}
-	}
-
-	@Override
-	public void snapshotState(FunctionSnapshotContext context) throws Exception {
-		Preconditions.checkNotNull(restoredBucketStates,
-			"The " + getClass().getSimpleName() + " has not been properly initialized.");
-
-		int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
-		
-		if (isWriterOpen) {
-			bucketState.currentFile = currentPartPath.toString();
-			bucketState.currentFileValidLength = writer.flush();
-		}
-
-		synchronized (bucketState.pendingFilesPerCheckpoint) {
-			bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
-		}
-		bucketState.pendingFiles = new ArrayList<>();
-
-		restoredBucketStates.clear();
-		restoredBucketStates.add(bucketState);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState);
-		}
-	}
-
-	private void handleRestoredBucketState(BucketState bucketState) {
-		// we can clean all the pending files since they were renamed to
-		// final files after this checkpoint was successful
-		// (we re-start from the last **successful** checkpoint)
-		bucketState.pendingFiles.clear();
-
-		if (bucketState.currentFile != null) {
-			// We were writing to a file when the last checkpoint occurred. This file can either
-			// be still in-progress or became a pending file at some point after the checkpoint.
-			// Either way, we have to truncate it back to a valid state (or write a .valid-length
-			// file that specifies up to which length it is valid) and rename it to the final name
-			// before starting a new bucket file.
-			Path partPath = new Path(bucketState.currentFile);
-			try {
-				Path partPendingPath = getPendingPathFor(partPath);
-				Path partInProgressPath = getInProgressPathFor(partPath);
-
-				if (fs.exists(partPendingPath)) {
-					LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath);
-					// has been moved to pending in the mean time, rename to final location
-					fs.rename(partPendingPath, partPath);
-				} else if (fs.exists(partInProgressPath)) {
-					LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath);
-					// it was still in progress, rename to final path
-					fs.rename(partInProgressPath, partPath);
-				} else if (fs.exists(partPath)) {
-					LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath);
-				} else {
-					LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " +
-							"it was moved to final location by a previous snapshot restore", bucketState.currentFile);
-				}
-
-				if (this.refTruncate == null) {
-					this.refTruncate = reflectTruncate(fs);
-				}
-
-				// truncate it or write a ".valid-length" file to specify up to which point it is valid
-				if (refTruncate != null) {
-					LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength);
-					// some-one else might still hold the lease from a previous try, we are
-					// recovering, after all ...
-					if (fs instanceof DistributedFileSystem) {
-						DistributedFileSystem dfs = (DistributedFileSystem) fs;
-						LOG.debug("Trying to recover file lease {}", partPath);
-						dfs.recoverLease(partPath);
-						boolean isclosed= dfs.isFileClosed(partPath);
-						StopWatch sw = new StopWatch();
-						sw.start();
-						while(!isclosed) {
-							if(sw.getTime() > asyncTimeout) {
-								break;
-							}
-							try {
-								Thread.sleep(500);
-							} catch (InterruptedException e1) {
-								// ignore it
-							}
-							isclosed = dfs.isFileClosed(partPath);
-						}
-					}
-					Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength);
-					if (!truncated) {
-						LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath);
-
-						// we must wait for the asynchronous truncate operation to complete
-						StopWatch sw = new StopWatch();
-						sw.start();
-						long newLen = fs.getFileStatus(partPath).getLen();
-						while(newLen != bucketState.currentFileValidLength) {
-							if(sw.getTime() > asyncTimeout) {
-								break;
-							}
-							try {
-								Thread.sleep(500);
-							} catch (InterruptedException e1) {
-								// ignore it
-							}
-							newLen = fs.getFileStatus(partPath).getLen();
-						}
-						if (newLen != bucketState.currentFileValidLength) {
-							throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + ".");
-						}
-					}
-
-				} else {
-					LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength);
-					Path validLengthFilePath = getValidLengthPathFor(partPath);
-					if (!fs.exists(validLengthFilePath)) {
-						FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath);
-						lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength));
-						lengthFileOut.close();
-					}
-				}
-
-				// invalidate in the state object
-				bucketState.currentFile = null;
-				bucketState.currentFileValidLength = -1;
-				isWriterOpen = false;
-			} catch (IOException e) {
-				LOG.error("Error while restoring RollingSink state.", e);
-				throw new RuntimeException("Error while restoring RollingSink state.", e);
-			} catch (InvocationTargetException | IllegalAccessException e) {
-				LOG.error("Could not invoke truncate.", e);
-				throw new RuntimeException("Could not invoke truncate.", e);
-			}
-		}
-
-		// Move files that are confirmed by a checkpoint but did not get moved to final location
-		// because the checkpoint notification did not happen before a failure
-
-		Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet();
-		LOG.debug("Moving pending files to final location on restore.");
-		for (Long pastCheckpointId : pastCheckpointIds) {
-			// All the pending files are buckets that have been completed but are waiting to be renamed
-			// to their final name
-			for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
-				Path finalPath = new Path(filename);
-				Path pendingPath = getPendingPathFor(finalPath);
-
-				try {
-					if (fs.exists(pendingPath)) {
-						LOG.debug("(RESTORE) Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId);
-						fs.rename(pendingPath, finalPath);
-					}
-				} catch (IOException e) {
-					LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e);
-					throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to final path " + finalPath, e);
-				}
-			}
-		}
-
-		synchronized (bucketState.pendingFilesPerCheckpoint) {
-			bucketState.pendingFilesPerCheckpoint.clear();
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Setters for User configuration values
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Sets the maximum bucket size in bytes.
-	 *
-	 * <p>
-	 * When a bucket part file becomes larger than this size a new bucket part file is started and
-	 * the old one is closed. The name of the bucket files depends on the {@link Bucketer}.
-	 *
-	 * @param batchSize The bucket part file size in bytes.
-	 */
-	public RollingSink<T> setBatchSize(long batchSize) {
-		this.batchSize = batchSize;
-		return this;
-	}
-
-	/**
-	 * Sets the {@link Bucketer} to use for determining the bucket files to write to.
-	 *
-	 * @param bucketer The bucketer to use.
-	 */
-	public RollingSink<T> setBucketer(Bucketer bucketer) {
-		this.bucketer = bucketer;
-		return this;
-	}
-
-	/**
-	 * Sets the {@link Writer} to be used for writing the incoming elements to bucket files.
-	 *
-	 * @param writer The {@code Writer} to use.
-	 */
-	public RollingSink<T> setWriter(Writer<T> writer) {
-		this.writerTemplate = writer;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of in-progress part files. The default is {@code "in-progress"}.
-	 */
-	public RollingSink<T> setInProgressSuffix(String inProgressSuffix) {
-		this.inProgressSuffix = inProgressSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of in-progress part files. The default is {@code "_"}.
-	 */
-	public RollingSink<T> setInProgressPrefix(String inProgressPrefix) {
-		this.inProgressPrefix = inProgressPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of pending part files. The default is {@code ".pending"}.
-	 */
-	public RollingSink<T> setPendingSuffix(String pendingSuffix) {
-		this.pendingSuffix = pendingSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of pending part files. The default is {@code "_"}.
-	 */
-	public RollingSink<T> setPendingPrefix(String pendingPrefix) {
-		this.pendingPrefix = pendingPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the suffix of valid-length files. The default is {@code ".valid-length"}.
-	 */
-	public RollingSink<T> setValidLengthSuffix(String validLengthSuffix) {
-		this.validLengthSuffix = validLengthSuffix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of valid-length files. The default is {@code "_"}.
-	 */
-	public RollingSink<T> setValidLengthPrefix(String validLengthPrefix) {
-		this.validLengthPrefix = validLengthPrefix;
-		return this;
-	}
-
-	/**
-	 * Sets the prefix of part files.  The default is {@code "part"}.
-	 */
-	public RollingSink<T> setPartPrefix(String partPrefix) {
-		this.partPrefix = partPrefix;
-		return this;
-	}
-
-	/**
-	 * Disable cleanup of leftover in-progress/pending files when the sink is opened.
-	 *
-	 * <p>
-	 * This should only be disabled if using the sink without checkpoints, to not remove
-	 * the files already in the directory.
-	 *
-	 * @deprecated This option is deprecated and remains only for backwards compatibility.
-	 * We do not clean up lingering files anymore.
-	 */
-	@Deprecated
-	public RollingSink<T> disableCleanupOnOpen() {
-		return this;
-	}
-
-	/**
-	 * Sets the default timeout for asynchronous operations such as recoverLease and truncate.
-	 *
-	 * @param timeout The timeout, in milliseconds.
-	 */
-	public RollingSink<T> setAsyncTimeout(long timeout) {
-		this.asyncTimeout = timeout;
-		return this;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal Classes
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * This is used for keeping track of the current in-progress files and files that we mark
-	 * for moving from pending to final location after we get a checkpoint-complete notification.
-	 */
-	static final class BucketState implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * The file that was in-progress when the last checkpoint occurred.
-		 */
-		String currentFile;
-
-		/**
-		 * The valid length of the in-progress file at the time of the last checkpoint.
-		 */
-		long currentFileValidLength = -1;
-
-		/**
-		 * Pending files that accumulated since the last checkpoint.
-		 */
-		List<String> pendingFiles = new ArrayList<>();
-
-		/**
-		 * When doing a checkpoint we move the pending files since the last checkpoint to this map
-		 * with the id of the checkpoint. When we get the checkpoint-complete notification we move
-		 * pending files of completed checkpoints to their final location.
-		 */
-		final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>();
-
-		@Override
-		public String toString() {
-			return
-				"In-progress=" + currentFile +
-				" validLength=" + currentFileValidLength +
-				" pendingForNextCheckpoint=" + pendingFiles +
-				" pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
deleted file mode 100644
index 08c0d0a..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-
-import java.io.IOException;
-
-/**
- * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}.
- * The input to the {@link BucketingSink} must
- * be a {@link org.apache.flink.api.java.tuple.Tuple2} of two Hadopo
- * {@link org.apache.hadoop.io.Writable Writables}.
- *
- * @param <K> The type of the first tuple field.
- * @param <V> The type of the second tuple field.
- */
-public class SequenceFileWriter<K extends Writable, V extends Writable> extends StreamWriterBase<Tuple2<K, V>> implements InputTypeConfigurable {
-	private static final long serialVersionUID = 1L;
-
-	private final String compressionCodecName;
-
-	private SequenceFile.CompressionType compressionType;
-
-	private transient SequenceFile.Writer writer;
-
-	private Class<K> keyClass;
-
-	private Class<V> valueClass;
-
-	/**
-	 * Creates a new {@code SequenceFileWriter} that writes sequence files without compression.
-	 */
-	public SequenceFileWriter() {
-		this("None", SequenceFile.CompressionType.NONE);
-	}
-
-	/**
-	 * Creates a new {@code SequenceFileWriter} that writes sequence with the given
-	 * compression codec and compression type.
-	 *
-	 * @param compressionCodecName Name of a Hadoop Compression Codec.
-	 * @param compressionType The compression type to use.
-	 */
-	public SequenceFileWriter(String compressionCodecName,
-			SequenceFile.CompressionType compressionType) {
-		this.compressionCodecName = compressionCodecName;
-		this.compressionType = compressionType;
-	}
-
-	@Override
-	public void open(FileSystem fs, Path path) throws IOException {
-		super.open(fs, path);
-		if (keyClass == null) {
-			throw new IllegalStateException("Key Class has not been initialized.");
-		}
-		if (valueClass == null) {
-			throw new IllegalStateException("Value Class has not been initialized.");
-		}
-
-		CompressionCodec codec = null;
-		
-		Configuration conf = HadoopFileSystem.getHadoopConfiguration();
-
-		if (!compressionCodecName.equals("None")) {
-			CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
-			codec = codecFactory.getCodecByName(compressionCodecName);
-			if (codec == null) {
-				throw new RuntimeException("Codec " + compressionCodecName + " not found.");
-			}
-		}
-
-		// the non-deprecated constructor syntax is only available in recent hadoop versions...
-		writer = SequenceFile.createWriter(conf,
-				getStream(),
-				keyClass,
-				valueClass,
-				compressionType,
-				codec);
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (writer != null) {
-			writer.close();
-		}
-		super.close();
-	}
-
-	@Override
-	public void write(Tuple2<K, V> element) throws IOException {
-		getStream(); // Throws if the stream is not open
-		writer.append(element.f0, element.f1);
-	}
-
-	@Override
-	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
-		if (!type.isTupleType()) {
-			throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
-		}
-
-		TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;
-
-		if (tupleType.getArity() != 2) {
-			throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
-		}
-
-		TypeInformation<K> keyType = tupleType.getTypeAt(0);
-		TypeInformation<V> valueType = tupleType.getTypeAt(1);
-
-		this.keyClass = keyType.getTypeClass();
-		this.valueClass = valueType.getTypeClass();
-	}
-
-	@Override
-	public Writer<Tuple2<K, V>> duplicate() {
-		SequenceFileWriter<K, V> result = new SequenceFileWriter<>(compressionCodecName, compressionType);
-		result.keyClass = keyClass;
-		result.valueClass = valueClass;
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
deleted file mode 100644
index 140246f..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-/**
- * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
- */
-public abstract class StreamWriterBase<T> implements Writer<T> {
-
-	private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);
-
-	/**
-	 * The {@code FSDataOutputStream} for the current part file.
-	 */
-	private transient FSDataOutputStream outStream;
-
-	/**
-	 * We use reflection to get the hflush method or use sync as a fallback.
-	 * The idea for this and the code comes from the Flume HDFS Sink.
-	 */
-	private transient Method refHflushOrSync;
-
-	/**
-	 * Returns the current output stream, if the stream is open.
-	 */
-	protected FSDataOutputStream getStream() {
-		if (outStream == null) {
-			throw new IllegalStateException("Output stream has not been opened");
-		}
-		return outStream;
-	}
-
-	/**
-	 * If hflush is available in this version of HDFS, then this method calls
-	 * hflush, else it calls sync.
-	 * @param os - The stream to flush/sync
-	 * @throws java.io.IOException
-	 *
-	 * <p>
-	 * Note: This code comes from Flume
-	 */
-	protected void hflushOrSync(FSDataOutputStream os) throws IOException {
-		try {
-			// At this point the refHflushOrSync cannot be null,
-			// since register method would have thrown if it was.
-			this.refHflushOrSync.invoke(os);
-		} catch (InvocationTargetException e) {
-			String msg = "Error while trying to hflushOrSync!";
-			LOG.error(msg + " " + e.getCause());
-			Throwable cause = e.getCause();
-			if(cause != null && cause instanceof IOException) {
-				throw (IOException)cause;
-			}
-			throw new RuntimeException(msg, e);
-		} catch (Exception e) {
-			String msg = "Error while trying to hflushOrSync!";
-			LOG.error(msg + " " + e);
-			throw new RuntimeException(msg, e);
-		}
-	}
-
-	/**
-	 * Gets the hflush call using reflection. Fallback to sync if hflush is not available.
-	 *
-	 * <p>
-	 * Note: This code comes from Flume
-	 */
-	private Method reflectHflushOrSync(FSDataOutputStream os) {
-		Method m = null;
-		if(os != null) {
-			Class<?> fsDataOutputStreamClass = os.getClass();
-			try {
-				m = fsDataOutputStreamClass.getMethod("hflush");
-			} catch (NoSuchMethodException ex) {
-				LOG.debug("HFlush not found. Will use sync() instead");
-				try {
-					m = fsDataOutputStreamClass.getMethod("sync");
-				} catch (Exception ex1) {
-					String msg = "Neither hflush not sync were found. That seems to be " +
-							"a problem!";
-					LOG.error(msg);
-					throw new RuntimeException(msg, ex1);
-				}
-			}
-		}
-		return m;
-	}
-
-	@Override
-	public void open(FileSystem fs, Path path) throws IOException {
-		if (outStream != null) {
-			throw new IllegalStateException("Writer has already been opened");
-		}
-		outStream = fs.create(path, false);
-		if (refHflushOrSync == null) {
-			refHflushOrSync = reflectHflushOrSync(outStream);
-		}
-	}
-
-	@Override
-	public long flush() throws IOException {
-		if (outStream == null) {
-			throw new IllegalStateException("Writer is not open");
-		}
-		hflushOrSync(outStream);
-		return outStream.getPos();
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		if (outStream == null) {
-			throw new IllegalStateException("Writer is not open");
-		}
-		return outStream.getPos();
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (outStream != null) {
-			flush();
-			outStream.close();
-			outStream = null;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
deleted file mode 100644
index 6568a86..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-
-/**
- * A {@link Writer} that uses {@code toString()} on the input elements and writes them to
- * the output bucket file separated by newline.
- *
- * @param <T> The type of the elements that are being written by the sink.
- */
-public class StringWriter<T> extends StreamWriterBase<T> {
-	private static final long serialVersionUID = 1L;
-
-	private String charsetName;
-
-	private transient Charset charset;
-
-	/**
-	 * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
-	 * strings to bytes.
-	 */
-	public StringWriter() {
-		this("UTF-8");
-	}
-
-	/**
-	 * Creates a new {@code StringWriter} that uses the given charset to convert
-	 * strings to bytes.
-	 *
-	 * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
-	 */
-	public StringWriter(String charsetName) {
-		this.charsetName = charsetName;
-	}
-
-	@Override
-	public void open(FileSystem fs, Path path) throws IOException {
-		super.open(fs, path);
-
-		try {
-			this.charset = Charset.forName(charsetName);
-		}
-		catch (IllegalCharsetNameException e) {
-			throw new IOException("The charset " + charsetName + " is not valid.", e);
-		}
-		catch (UnsupportedCharsetException e) {
-			throw new IOException("The charset " + charsetName + " is not supported.", e);
-		}
-	}
-
-	@Override
-	public void write(T element) throws IOException {
-		FSDataOutputStream outputStream = getStream();
-		outputStream.write(element.toString().getBytes(charset));
-		outputStream.write('\n');
-	}
-
-	@Override
-	public Writer<T> duplicate() {
-		return new StringWriter<>();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
deleted file mode 100644
index 41663df..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-
-/**
- * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time.
- */
-public class SystemClock implements Clock {
-	@Override
-	public long currentTimeMillis() {
-		return System.currentTimeMillis();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
deleted file mode 100644
index c3b4cb6..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs;
-
-import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * An implementation of {@code Writer} is used in conjunction with a
- * {@link BucketingSink} to perform the actual
- * writing to the bucket files.
- *
- * @param <T> The type of the elements that are being written by the sink.
- */
-public interface Writer<T> extends Serializable {
-
-	/**
-	 * Initializes the {@code Writer} for a newly opened bucket file.
-	 * Any internal per-bucket initialization should be performed here.
-	 *
-	 * @param fs The {@link org.apache.hadoop.fs.FileSystem} containing the newly opened file.
-	 * @param path The {@link org.apache.hadoop.fs.Path} of the newly opened file.
-	 */
-	void open(FileSystem fs, Path path) throws IOException;
-
-	/**
-	 * Flushes out any internally held data, and returns the offset that the file
-	 * must be truncated to at recovery.
-	 */
-	long flush() throws IOException;
-
-	/**
-	 * Retrieves the current position, and thus size, of the output file.
-	 */
-	long getPos() throws IOException;
-
-	/**
-	 * Closes the {@code Writer}. If the writer is already closed, no action will be
-	 * taken. The call should close all state related to the current output file,
-	 * including the output stream opened in {@code open}.
-	 */
-	void close() throws IOException ;
-
-	/**
-	 * Writes one element to the bucket file.
-	 */
-	void write(T element)throws IOException;
-
-	/**
-	 * Duplicates the {@code Writer}. This is used to get one {@code Writer} for each
-	 * parallel instance of the sink.
-	 */
-	Writer<T> duplicate();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
deleted file mode 100644
index 0bf14b3..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs.bucketing;
-
-import org.apache.flink.streaming.connectors.fs.Clock;
-import org.apache.hadoop.fs.Path;
-
-/**
- * A {@link Bucketer} that does not perform any
- * bucketing of files. All files are written to the base path.
- */
-public class BasePathBucketer<T> implements Bucketer<T> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Path getBucketPath(Clock clock, Path basePath, T element) {
-		return basePath;
-	}
-
-	@Override
-	public String toString() {
-		return "BasePathBucketer";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
deleted file mode 100644
index 86aa9f3..0000000
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.fs.bucketing;
-
-import org.apache.flink.streaming.connectors.fs.Clock;
-import org.apache.hadoop.fs.Path;
-
-import java.io.Serializable;
-
-/**
- * A bucketer is used with a {@link BucketingSink}
- * to put emitted elements into rolling files.
- *
- * <p>
- * The {@code BucketingSink} can be writing to many buckets at a time, and it is responsible for managing
- * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket
- * path the element should fall in. The {@code Bucketer} can, for example, determine buckets based on
- * system time.
- */
-public interface Bucketer<T> extends Serializable {
-	/**
-	 * Returns the {@link Path} of a bucket file.
-	 *
-	 * @param basePath The base path containing all the buckets.
-	 * @param element The current element being processed.
-	 *
-	 * @return The complete {@code Path} of the bucket which the provided element should fall in. This
-	 * should include the {@code basePath} and also the {@code subtaskIndex} to avoid clashes with
-	 * parallel sinks.
-	 */
-	Path getBucketPath(Clock clock, Path basePath, T element);
-}