You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/25 02:42:10 UTC

[GitHub] XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index c1ef344175b..8eb43424264 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -62,6 +62,9 @@
 	// Charset is not serializable
 	private transient Charset charset;
 
+	/** The charset of bom in the file to process. */
+	private String bomCharsetName;
+
 	/**
 	 * The default read buffer size = 1MB.
 	 */
@@ -221,6 +224,11 @@ public void setCharset(String charset) {
 		}
 	}
 
+	@PublicEvolving
+	public String getBomCharsetName() {
+		return this.bomCharsetName;
+	}
+
 	public byte[] getDelimiter() {
 		return delimiter;
 	}
@@ -341,7 +349,7 @@ public void configure(Configuration parameters) {
 	@Override
 	public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
 		
-		final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
+		final FileBaseStatistics cachedFileStats = cachedStats instanceof FileInputFormat.FileBaseStatistics ?
 				(FileBaseStatistics) cachedStats : null;
 		
 		// store properties
@@ -408,7 +416,9 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
 			while (samplesTaken < numSamples && fileNum < allFiles.size()) {
 				// make a split for the sample and use it to read a record
 				FileStatus file = allFiles.get(fileNum);
-				FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);
+				String bomCharsetName = getBomCharset(file);
+
+				FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null, bomCharsetName);
 
 				// we open the split, read one line, and take its length
 				try {
@@ -467,6 +477,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
 	 */
 	@Override
 	public void open(FileInputSplit split) throws IOException {
+		this.bomCharsetName = split.getBomCharsetName();
 		super.open(split);
 		initBuffers();
 
@@ -736,7 +747,7 @@ public void reopen(FileInputSplit split, Long state) throws IOException {
 		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
 		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
 		Preconditions.checkArgument(state == -1 || state >= split.getStart(),
-			" Illegal offset "+ state +", smaller than the splits start=" + split.getStart());
+			" Illegal offset " + state + ", smaller than the splits start=" + split.getStart());
 
 		try {
 			this.open(split);
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 14cf647cd24..c58344fba6c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -34,7 +34,7 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,20 +55,20 @@
  * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented.
  * Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to
  * change the life cycle behavior.
- * 
+ *
  * <p>After the {@link #open(FileInputSplit)} method completed, the file input data is available
  * from the {@link #stream} field.</p>
  */
 @Public
 public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit> {
-	
+
 	// -------------------------------------- Constants -------------------------------------------
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(FileInputFormat.class);
-	
+
 	private static final long serialVersionUID = 1L;
-	
-	
+
+
 	/**
 	 * The fraction that the last split may be larger than the others.
 	 */
@@ -84,8 +84,8 @@
 	 * unsplittable files.
 	 */
 	protected static final Map<String, InflaterInputStreamFactory<?>> INFLATER_INPUT_STREAM_FACTORIES =
-			new HashMap<String, InflaterInputStreamFactory<?>>();
-	
+		new HashMap<String, InflaterInputStreamFactory<?>>();
+
 	/**
 	 * The splitLength is set to -1L for reading the whole split.
 	 */
@@ -96,9 +96,11 @@
 		initDefaultInflaterInputStreamFactories();
 	}
 
+
 	/**
 	 * Initialize defaults for input format. Needs to be a static method because it is configured for local
 	 * cluster execution, see LocalFlinkMiniCluster.
+	 *
 	 * @param configuration The configuration to load defaults from
 	 */
 	private static void initDefaultsFromConfiguration(Configuration configuration) {
@@ -117,10 +119,10 @@ private static void initDefaultsFromConfiguration(Configuration configuration) {
 
 	private static void initDefaultInflaterInputStreamFactories() {
 		InflaterInputStreamFactory<?>[] defaultFactories = {
-				DeflateInflaterInputStreamFactory.getInstance(),
-				GzipInflaterInputStreamFactory.getInstance(),
-				Bzip2InputStreamFactory.getInstance(),
-				XZInputStreamFactory.getInstance(),
+			DeflateInflaterInputStreamFactory.getInstance(),
+			GzipInflaterInputStreamFactory.getInstance(),
+			Bzip2InputStreamFactory.getInstance(),
+			XZInputStreamFactory.getInstance(),
 		};
 		for (InflaterInputStreamFactory<?> inputStreamFactory : defaultFactories) {
 			for (String fileExtension : inputStreamFactory.getCommonFileExtensions()) {
@@ -132,8 +134,9 @@ private static void initDefaultInflaterInputStreamFactories() {
 	/**
 	 * Registers a decompression algorithm through a {@link org.apache.flink.api.common.io.compression.InflaterInputStreamFactory}
 	 * with a file extension for transparent decompression.
+	 *
 	 * @param fileExtension of the compressed files
-	 * @param factory to create an {@link java.util.zip.InflaterInputStream} that handles the decompression format
+	 * @param factory       to create an {@link java.util.zip.InflaterInputStream} that handles the decompression format
 	 */
 	public static void registerInflaterInputStreamFactory(String fileExtension, InflaterInputStreamFactory<?> factory) {
 		synchronized (INFLATER_INPUT_STREAM_FACTORIES) {
@@ -151,23 +154,24 @@ public static void registerInflaterInputStreamFactory(String fileExtension, Infl
 
 	/**
 	 * Returns the extension of a file name (!= a path).
+	 *
 	 * @return the extension of the file name or {@code null} if there is no extension.
 	 */
 	protected static String extractFileExtension(String fileName) {
 		checkNotNull(fileName);
 		int lastPeriodIndex = fileName.lastIndexOf('.');
-		if (lastPeriodIndex < 0){
+		if (lastPeriodIndex < 0) {
 			return null;
 		} else {
 			return fileName.substring(lastPeriodIndex + 1);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Variables for internal operation.
-	//  They are all transient, because we do not want them so be serialized 
+	//  They are all transient, because we do not want them so be serialized
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * The input stream reading from the input file.
 	 */
@@ -187,16 +191,16 @@ protected static String extractFileExtension(String fileName) {
 	 * The current split that this parallel instance must consume.
 	 */
 	protected transient FileInputSplit currentSplit;
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  The configuration parameters. Configured on the instance and serialized to be shipped.
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * The path to the file that contains the input.
 	 *
 	 * @deprecated Please override {@link FileInputFormat#supportsMultiPaths()} and
-	 *             use {@link FileInputFormat#getFilePaths()} and {@link FileInputFormat#setFilePaths(Path...)}.
+	 * use {@link FileInputFormat#getFilePaths()} and {@link FileInputFormat#setFilePaths(Path...)}.
 	 */
 	@Deprecated
 	protected Path filePath;
@@ -205,22 +209,22 @@ protected static String extractFileExtension(String fileName) {
 	 * The list of paths to files and directories that contain the input.
 	 */
 	private Path[] filePaths;
-	
+
 	/**
 	 * The minimal split size, set by the configure() method.
 	 */
-	protected long minSplitSize = 0; 
-	
+	protected long minSplitSize = 0;
+
 	/**
 	 * The desired number of splits, as set by the configure() method.
 	 */
 	protected int numSplits = -1;
-	
+
 	/**
 	 * Stream opening timeout.
 	 */
 	protected long openTimeout = DEFAULT_OPENING_TIMEOUT;
-	
+
 	/**
 	 * Some file input formats are not splittable on a block level (avro, deflate)
 	 * Therefore, the FileInputFormat can only read whole files.
@@ -240,24 +244,23 @@ protected static String extractFileExtension(String fileName) {
 
 	// --------------------------------------------------------------------------------------------
 	//  Constructors
-	// --------------------------------------------------------------------------------------------	
+	// --------------------------------------------------------------------------------------------
 
-	public FileInputFormat() {}
+	public FileInputFormat() {
+	}
 
 	protected FileInputFormat(Path filePath) {
 		if (filePath != null) {
 			setFilePath(filePath);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Getters/setters for the configurable parameters
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 *
 	 * @return The path of the file to read.
-	 *
 	 * @deprecated Please use getFilePaths() instead.
 	 */
 	@Deprecated
@@ -276,10 +279,10 @@ public Path getFilePath() {
 			return filePath;
 		}
 	}
-	
+
 	/**
 	 * Returns the paths of all files to be read by the FileInputFormat.
-	 * 
+	 *
 	 * @return The list of all paths to read.
 	 */
 	public Path[] getFilePaths() {
@@ -293,10 +296,10 @@ public Path getFilePath() {
 			if (this.filePath == null) {
 				return new Path[0];
 			}
-			return new Path[] {filePath};
+			return new Path[]{filePath};
 		}
 	}
-	
+
 	public void setFilePath(String filePath) {
 		if (filePath == null) {
 			throw new IllegalArgumentException("File path cannot be null.");
@@ -318,7 +321,7 @@ public void setFilePath(String filePath) {
 			throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage());
 		}
 	}
-	
+
 	/**
 	 * Sets a single path of a file to be read.
 	 *
@@ -331,10 +334,10 @@ public void setFilePath(Path filePath) {
 
 		setFilePaths(filePath);
 	}
-	
+
 	/**
 	 * Sets multiple paths of files to be read.
-	 * 
+	 *
 	 * @param filePaths The paths of the files to read.
 	 */
 	public void setFilePaths(String... filePaths) {
@@ -368,11 +371,11 @@ public void setFilePaths(Path... filePaths) {
 
 		this.filePaths = filePaths;
 	}
-	
+
 	public long getMinSplitSize() {
 		return minSplitSize;
 	}
-	
+
 	public void setMinSplitSize(long minSplitSize) {
 		if (minSplitSize < 0) {
 			throw new IllegalArgumentException("The minimum split size cannot be negative.");
@@ -380,23 +383,23 @@ public void setMinSplitSize(long minSplitSize) {
 
 		this.minSplitSize = minSplitSize;
 	}
-	
+
 	public int getNumSplits() {
 		return numSplits;
 	}
-	
+
 	public void setNumSplits(int numSplits) {
 		if (numSplits < -1 || numSplits == 0) {
 			throw new IllegalArgumentException("The desired number of splits must be positive or -1 (= don't care).");
 		}
-		
+
 		this.numSplits = numSplits;
 	}
-	
+
 	public long getOpenTimeout() {
 		return openTimeout;
 	}
-	
+
 	public void setOpenTimeout(long openTimeout) {
 		if (openTimeout < 0) {
 			throw new IllegalArgumentException("The timeout for opening the input splits must be positive or zero (= infinite).");
@@ -415,7 +418,7 @@ public boolean getNestedFileEnumeration() {
 	// --------------------------------------------------------------------------------------------
 	// Getting information about the split that is currently open
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Gets the start of the current split.
 	 *
@@ -424,7 +427,7 @@ public boolean getNestedFileEnumeration() {
 	public long getSplitStart() {
 		return splitStart;
 	}
-	
+
 	/**
 	 * Gets the length or remaining length of the current split.
 	 *
@@ -441,10 +444,10 @@ public void setFilesFilter(FilePathFilter filesFilter) {
 	// --------------------------------------------------------------------------------------------
 	//  Pre-flight: Configuration, Splits, Sampling
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Configures the file input format by reading the file path from the configuration.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.io.InputFormat#configure(org.apache.flink.configuration.Configuration)
 	 */
 	@Override
@@ -467,34 +470,33 @@ public void configure(Configuration parameters) {
 
 	/**
 	 * Obtains basic file statistics containing only file size. If the input is a directory, then the size is the sum of all contained files.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.io.InputFormat#getStatistics(org.apache.flink.api.common.io.statistics.BaseStatistics)
 	 */
 	@Override
 	public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		
+
 		final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
 			(FileBaseStatistics) cachedStats : null;
-				
+
 		try {
 			return getFileStats(cachedFileStats, getFilePaths(), new ArrayList<>(getFilePaths().length));
 		} catch (IOException ioex) {
 			if (LOG.isWarnEnabled()) {
 				LOG.warn("Could not determine statistics for paths '" + Arrays.toString(getFilePaths()) + "' due to an io error: "
-						+ ioex.getMessage());
+					+ ioex.getMessage());
 			}
-		}
-		catch (Throwable t) {
+		} catch (Throwable t) {
 			if (LOG.isErrorEnabled()) {
 				LOG.error("Unexpected problem while getting the file statistics for paths '" + Arrays.toString(getFilePaths()) + "': "
-						+ t.getMessage(), t);
+					+ t.getMessage(), t);
 			}
 		}
-		
+
 		// no statistics available
 		return null;
 	}
-	
+
 	protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path[] filePaths, ArrayList<FileStatus> files) throws IOException {
 
 		long totalLength = 0;
@@ -519,7 +521,7 @@ protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path[]
 
 		return new FileBaseStatistics(latestModTime, totalLength, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
 	}
-	
+
 	protected FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, Path filePath, FileSystem fs, ArrayList<FileStatus> files) throws IOException {
 
 		// get the file info and check whether the cached statistics are still valid.
@@ -562,10 +564,9 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits
 	 * Computes the input splits for the file. By default, one file block is one split. If more splits
 	 * are requested than blocks are available, then a split may be a fraction of a block and splits may cross
 	 * block boundaries.
-	 * 
+	 *
 	 * @param minNumSplits The minimum desired number of file splits.
 	 * @return The computed file splits.
-	 * 
 	 * @see org.apache.flink.api.common.io.InputFormat#createInputSplits(int)
 	 */
 	@Override
@@ -573,10 +574,10 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits
 		if (minNumSplits < 1) {
 			throw new IllegalArgumentException("Number of input splits has to be at least 1.");
 		}
-		
+
 		// take the desired number of splits into account
 		minNumSplits = Math.max(minNumSplits, this.numSplits);
-		
+
 		final List<FileInputSplit> inputSplits = new ArrayList<FileInputSplit>(minNumSplits);
 
 		// get all the files that are involved in the splits
@@ -601,23 +602,25 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits
 		if (unsplittable) {
 			int splitNum = 0;
 			for (final FileStatus file : files) {
+				String bomCharsetName = getBomCharset(file);
+
 				final FileSystem fs = file.getPath().getFileSystem();
 				final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, file.getLen());
 				Set<String> hosts = new HashSet<String>();
-				for(BlockLocation block : blocks) {
+				for (BlockLocation block : blocks) {
 					hosts.addAll(Arrays.asList(block.getHosts()));
 				}
 				long len = file.getLen();
-				if(testForUnsplittable(file)) {
+				if (testForUnsplittable(file)) {
 					len = READ_WHOLE_SPLIT_FLAG;
 				}
 				FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, len,
-						hosts.toArray(new String[hosts.size()]));
+					hosts.toArray(new String[hosts.size()]), bomCharsetName);
 				inputSplits.add(fis);
 			}
 			return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
 		}
-		
+
 
 		final long maxSplitSize = totalLength / minNumSplits + (totalLength % minNumSplits == 0 ? 0 : 1);
 
@@ -625,17 +628,18 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits
 		int splitNum = 0;
 		for (final FileStatus file : files) {
 
+			String bomCharsetName = getBomCharset(file);
+
 			final FileSystem fs = file.getPath().getFileSystem();
 			final long len = file.getLen();
 			final long blockSize = file.getBlockSize();
-			
+
 			final long minSplitSize;
 			if (this.minSplitSize <= blockSize) {
 				minSplitSize = this.minSplitSize;
-			}
-			else {
+			} else {
 				if (LOG.isWarnEnabled()) {
-					LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " + 
+					LOG.warn("Minimal split size of " + this.minSplitSize + " is larger than the block size of " +
 						blockSize + ". Decreasing minimal split size to block size.");
 				}
 				minSplitSize = blockSize;
@@ -662,7 +666,7 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits
 					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
 					// create a new split
 					FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position, splitSize,
-						blocks[blockIndex].getHosts());
+						blocks[blockIndex].getHosts(), bomCharsetName);
 					inputSplits.add(fis);
 
 					// adjust the positions
@@ -674,7 +678,7 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits
 				if (bytesUnassigned > 0) {
 					blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
 					final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), position,
-						bytesUnassigned, blocks[blockIndex].getHosts());
+						bytesUnassigned, blocks[blockIndex].getHosts(), bomCharsetName);
 					inputSplits.add(fis);
 				}
 			} else {
@@ -686,7 +690,7 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits
 				} else {
 					hosts = new String[0];
 				}
-				final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts);
+				final FileInputSplit fis = new FileInputSplit(splitNum++, file.getPath(), 0, 0, hosts, bomCharsetName);
 				inputSplits.add(fis);
 			}
 		}
@@ -696,32 +700,32 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits
 
 	/**
 	 * Enumerate all files in the directory and recursive if enumerateNestedFiles is true.
+	 *
 	 * @return the total length of accepted files.
 	 */
 	private long addFilesInDir(Path path, List<FileStatus> files, boolean logExcludedFiles)
-			throws IOException {
+		throws IOException {
 		final FileSystem fs = path.getFileSystem();
 
 		long length = 0;
 
-		for(FileStatus dir: fs.listStatus(path)) {
+		for (FileStatus dir : fs.listStatus(path)) {
 			if (dir.isDir()) {
 				if (acceptFile(dir) && enumerateNestedFiles) {
 					length += addFilesInDir(dir.getPath(), files, logExcludedFiles);
 				} else {
 					if (logExcludedFiles && LOG.isDebugEnabled()) {
-						LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded.");
+						LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded.");
 					}
 				}
-			}
-			else {
-				if(acceptFile(dir)) {
+			} else {
+				if (acceptFile(dir)) {
 					files.add(dir);
 					length += dir.getLen();
 					testForUnsplittable(dir);
 				} else {
 					if (logExcludedFiles && LOG.isDebugEnabled()) {
-						LOG.debug("Directory "+dir.getPath().toString()+" did not pass the file-filter and is excluded.");
+						LOG.debug("Directory " + dir.getPath().toString() + " did not pass the file-filter and is excluded.");
 					}
 				}
 			}
@@ -730,7 +734,7 @@ private long addFilesInDir(Path path, List<FileStatus> files, boolean logExclude
 	}
 
 	protected boolean testForUnsplittable(FileStatus pathFile) {
-		if(getInflaterInputStreamFactory(pathFile.getPath()) != null) {
+		if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {
 			unsplittable = true;
 			return true;
 		}
@@ -751,7 +755,7 @@ protected boolean testForUnsplittable(FileStatus pathFile) {
 	 * A simple hook to filter files and directories from the input.
 	 * The method may be overridden. Hadoop's FileInputFormat has a similar mechanism and applies the
 	 * same filters by default.
-	 * 
+	 *
 	 * @param fileStatus The file status to check.
 	 * @return true, if the given file or directory is accepted
 	 */
@@ -765,9 +769,9 @@ public boolean acceptFile(FileStatus fileStatus) {
 	/**
 	 * Retrieves the index of the <tt>BlockLocation</tt> that contains the part of the file described by the given
 	 * offset.
-	 * 
-	 * @param blocks The different blocks of the file. Must be ordered by their offset.
-	 * @param offset The offset of the position in the file.
+	 *
+	 * @param blocks     The different blocks of the file. Must be ordered by their offset.
+	 * @param offset     The offset of the position in the file.
 	 * @param startIndex The earliest index to look at.
 	 * @return The index of the block containing the given position.
 	 */
@@ -789,14 +793,14 @@ private int getBlockIndexForPosition(BlockLocation[] blocks, long offset, long h
 		}
 		throw new IllegalArgumentException("The given offset is not contained in the any block.");
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
 	 * Opens an input stream to the file defined in the input format.
 	 * The stream is positioned at the beginning of the given split.
 	 * <p>
-	 * The stream is actually opened in an asynchronous thread to make sure any interruptions to the thread 
+	 * The stream is actually opened in an asynchronous thread to make sure any interruptions to the thread
 	 * working on the input format do not reach the file system.
 	 */
 	@Override
@@ -810,20 +814,19 @@ public void open(FileInputSplit fileSplit) throws IOException {
 			LOG.debug("Opening input split " + fileSplit.getPath() + " [" + this.splitStart + "," + this.splitLength + "]");
 		}
 
-		
+
 		// open the split in an asynchronous thread
 		final InputSplitOpenThread isot = new InputSplitOpenThread(fileSplit, this.openTimeout);
 		isot.start();
-		
+
 		try {
 			this.stream = isot.waitForCompletion();
 			this.stream = decorateInputStream(this.stream, fileSplit);
+		} catch (Throwable t) {
+			throw new IOException("Error opening the Input Split " + fileSplit.getPath() +
+				" [" + splitStart + "," + splitLength + "]: " + t.getMessage(), t);
 		}
-		catch (Throwable t) {
-			throw new IOException("Error opening the Input Split " + fileSplit.getPath() + 
-					" [" + splitStart + "," + splitLength + "]: " + t.getMessage(), t);
-		}
-		
+
 		// get FSDataInputStream
 		if (this.splitStart != 0) {
 			this.stream.seek(this.splitStart);
@@ -862,13 +865,12 @@ public void close() throws IOException {
 			stream = null;
 		}
 	}
-	
+
 	/**
 	 * Override this method to supports multiple paths.
 	 * When this method will be removed, all FileInputFormats have to support multiple paths.
 	 *
 	 * @return True if the FileInputFormat supports multiple paths, false otherwise.
-	 *
 	 * @deprecated Will be removed for Flink 2.0.
 	 */
 	@Deprecated
@@ -876,21 +878,74 @@ public boolean supportsMultiPaths() {
 		return false;
 	}
 
+	@Override
 	public String toString() {
 		return getFilePaths() == null || getFilePaths().length == 0 ?
 			"File Input (unknown file)" :
-			"File Input (" +  Arrays.toString(this.getFilePaths()) + ')';
+			"File Input (" + Arrays.toString(this.getFilePaths()) + ')';
+	}
+
+	/**
+	 * Get file bom encoding
+	 *
+	 * @param fs
+	 * @return
+	 */
+	public String getBomCharset(FileStatus fs) {
+		FSDataInputStream inStream = null;
+		String charset, testFileSystem = "TestFileSystem";
+		byte[] bom = new byte[4];
+		byte[] bytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+		try {
+			/*
+			 * int read() Reads a data byte from this input stream. Int read(byte[] b) will be at most b.length from this input stream
+			 * Bytes of data are read into a byte array. Int read(byte[] b, int off, int len)
+			 * Reads up to len bytes of data from this input stream into a byte array.
+			 */
+			FileSystem fileSystem = fs.getPath().getFileSystem();
+			if (testFileSystem.equals(fileSystem.getClass().getSimpleName())) {
+				fileSystem = new LocalFileSystem();
+			}
+
+			inStream = fileSystem.open(fs.getPath());
+			inStream.read(bom, 0, bom.length);
+
+			if ((bom[0] == bytes[0]) && (bom[1] == bytes[0]) && (bom[2] == bytes[1]) && (bom[3] == bytes[2])) {
+				charset = "UTF-32BE";
+			} else if ((bom[0] == bytes[2]) && (bom[1] == bytes[1]) && (bom[2] == bytes[0]) && (bom[3] == bytes[0])) {
+				charset = "UTF-32LE";
+			} else if ((bom[0] == bytes[3]) && (bom[1] == bytes[4]) && (bom[2] == bytes[5])) {
+				charset = "UTF-8";
+			} else if ((bom[0] == bytes[1]) && (bom[1] == bytes[2])) {
+				charset = "UTF-16BE";
+			} else if ((bom[0] == bytes[2]) && (bom[1] == bytes[1])) {
+				charset = "UTF-16LE";
+			} else {
+				charset = null;
+			}
+		} catch (Exception e) {
+			throw new IllegalArgumentException("Failed to get file bom encoding.");
+		} finally {
+			if (null != inStream) {
+				try {
+					inStream.close();
+				} catch (IOException e) {
+					e.printStackTrace();
+				}
+			}
+		}
+		return charset;
 	}
 
 	// ============================================================================================
-	
+
 	/**
 	 * Encapsulation of the basic statistics the optimizer obtains about a file. Contained are the size of the file
 	 * and the average bytes of a single record. The statistics also have a time-stamp that records the modification
 	 * time of the file and indicates as such for which time the statistics were valid.
 	 */
 	public static class FileBaseStatistics implements BaseStatistics {
-		
+
 		protected final long fileModTime; // timestamp of the last modification
 
 		protected final long fileSize; // size of the file(s) in bytes
@@ -899,13 +954,10 @@ public String toString() {
 
 		/**
 		 * Creates a new statistics object.
-		 * 
-		 * @param fileModTime
-		 *        The timestamp of the latest modification of any of the involved files.
-		 * @param fileSize
-		 *        The size of the file, in bytes. <code>-1</code>, if unknown.
-		 * @param avgBytesPerRecord
-		 *        The average number of byte in a record, or <code>-1.0f</code>, if unknown.
+		 *
+		 * @param fileModTime       The timestamp of the latest modification of any of the involved files.
+		 * @param fileSize          The size of the file, in bytes. <code>-1</code>, if unknown.
+		 * @param avgBytesPerRecord The average number of byte in a record, or <code>-1.0f</code>, if unknown.
 		 */
 		public FileBaseStatistics(long fileModTime, long fileSize, float avgBytesPerRecord) {
 			this.fileModTime = fileModTime;
@@ -915,7 +967,7 @@ public FileBaseStatistics(long fileModTime, long fileSize, float avgBytesPerReco
 
 		/**
 		 * Gets the timestamp of the last modification.
-		 * 
+		 *
 		 * @return The timestamp of the last modification.
 		 */
 		public long getLastModificationTime() {
@@ -924,7 +976,7 @@ public long getLastModificationTime() {
 
 		/**
 		 * Gets the file size.
-		 * 
+		 *
 		 * @return The fileSize.
 		 * @see org.apache.flink.api.common.io.statistics.BaseStatistics#getTotalInputSize()
 		 */
@@ -936,19 +988,19 @@ public long getTotalInputSize() {
 		/**
 		 * Gets the estimates number of records in the file, computed as the file size divided by the
 		 * average record width, rounded up.
-		 * 
+		 *
 		 * @return The estimated number of records in the file.
 		 * @see org.apache.flink.api.common.io.statistics.BaseStatistics#getNumberOfRecords()
 		 */
 		@Override
 		public long getNumberOfRecords() {
-			return (this.fileSize == SIZE_UNKNOWN || this.avgBytesPerRecord == AVG_RECORD_BYTES_UNKNOWN) ? 
+			return (this.fileSize == SIZE_UNKNOWN || this.avgBytesPerRecord == AVG_RECORD_BYTES_UNKNOWN) ?
 				NUM_RECORDS_UNKNOWN : (long) Math.ceil(this.fileSize / this.avgBytesPerRecord);
 		}
 
 		/**
 		 * Gets the estimated average number of bytes per record.
-		 * 
+		 *
 		 * @return The average number of bytes per record.
 		 * @see org.apache.flink.api.common.io.statistics.BaseStatistics#getAverageRecordWidth()
 		 */
@@ -956,35 +1008,35 @@ public long getNumberOfRecords() {
 		public float getAverageRecordWidth() {
 			return this.avgBytesPerRecord;
 		}
-		
+
 		@Override
 		public String toString() {
 			return "size=" + this.fileSize + ", recWidth=" + this.avgBytesPerRecord + ", modAt=" + this.fileModTime;
 		}
 	}
-	
+
 	// ============================================================================================
-	
+
 	/**
 	 * Obtains a DataInputStream in an thread that is not interrupted.
 	 * This is a necessary hack around the problem that the HDFS client is very sensitive to InterruptedExceptions.
 	 */
 	public static class InputSplitOpenThread extends Thread {
-		
+
 		private final FileInputSplit split;
-		
+
 		private final long timeout;
 
 		private volatile FSDataInputStream fdis;
 
 		private volatile Throwable error;
-		
+
 		private volatile boolean aborted;
 
 		public InputSplitOpenThread(FileInputSplit split, long timeout) {
 			super("Transient InputSplit Opener");
 			setDaemon(true);
-			
+
 			this.split = split;
 			this.timeout = timeout;
 		}
@@ -994,37 +1046,35 @@ public void run() {
 			try {
 				final FileSystem fs = FileSystem.get(this.split.getPath().toUri());
 				this.fdis = fs.open(this.split.getPath());
-				
+
 				// check for canceling and close the stream in that case, because no one will obtain it
 				if (this.aborted) {
 					final FSDataInputStream f = this.fdis;
 					this.fdis = null;
 					f.close();
 				}
-			}
-			catch (Throwable t) {
+			} catch (Throwable t) {
 				this.error = t;
 			}
 		}
-		
+
 		public FSDataInputStream waitForCompletion() throws Throwable {
 			final long start = System.currentTimeMillis();
 			long remaining = this.timeout;
-			
+
 			do {
 				try {
 					// wait for the task completion
 					this.join(remaining);
-				}
-				catch (InterruptedException iex) {
+				} catch (InterruptedException iex) {
 					// we were canceled, so abort the procedure
 					abortWait();
 					throw iex;
 				}
 			}
 			while (this.error == null && this.fdis == null &&
-					(remaining = this.timeout + start - System.currentTimeMillis()) > 0);
-			
+				(remaining = this.timeout + start - System.currentTimeMillis()) > 0);
+
 			if (this.error != null) {
 				throw this.error;
 			}
@@ -1036,17 +1086,17 @@ public FSDataInputStream waitForCompletion() throws Throwable {
 				// b) the flag was set such that the stream did not see it and we have a valid stream
 				// In any case, close the stream and throw an exception.
 				abortWait();
-				
+
 				final boolean stillAlive = this.isAlive();
 				final StringBuilder bld = new StringBuilder(256);
 				for (StackTraceElement e : this.getStackTrace()) {
 					bld.append("\tat ").append(e.toString()).append('\n');
 				}
-				throw new IOException("Input opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") + 
+				throw new IOException("Input opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") +
 					" alive. Stack of split open thread:\n" + bld.toString());
 			}
 		}
-		
+
 		/**
 		 * Double checked procedure setting the abort flag and closing the stream.
 		 */
@@ -1057,17 +1107,18 @@ private void abortWait() {
 			if (inStream != null) {
 				try {
 					inStream.close();
-				} catch (Throwable t) {}
+				} catch (Throwable t) {
+				}
 			}
 		}
 	}
-	
+
 	// ============================================================================================
 	//  Parameterization via configuration
 	// ============================================================================================
-	
+
 	// ------------------------------------- Config Keys ------------------------------------------
-	
+
 	/**
 	 * The config parameter which defines the input file path.
 	 */
@@ -1077,4 +1128,5 @@ private void abortWait() {
 	 * The config parameter which defines whether input directories are recursively traversed.
 	 */
 	public static final String ENUMERATE_NESTED_FILES_FLAG = "recursive.file.enumeration";
+
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index b53ac4b4924..9e28522ddc3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -39,6 +39,9 @@
 	/** The number of bytes in the file to process. */
 	private final long length;
 
+	/** The charset of bom in the file to process. */
+	private String bomCharsetName;
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -63,6 +66,31 @@ public FileInputSplit(int num, Path file, long start, long length, String[] host
 		this.length = length;
 	}
 
+	/**
+	 * Constructs a split with host information.
+	 *
+	 * @param num
+	 *        the number of this input split
+	 * @param file
+	 *        the file name
+	 * @param start
+	 *        the position of the first byte in the file to process
+	 * @param length
+	 *        the number of bytes in the file to process (-1 is flag for "read whole file")
+	 * @param hosts
+	 *        the list of hosts containing the block, possibly <code>null</code>
+	 * @param bomCharsetName
+	 *        The charset of bom in the file to process (default is UTF-8)
+	 */
+	public FileInputSplit(int num, Path file, long start, long length, String[] hosts, String bomCharsetName) {
+		super(num, hosts);
+
+		this.file = file;
+		this.start = start;
+		this.length = length;
+		this.bomCharsetName = bomCharsetName;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -92,6 +120,15 @@ public long getLength() {
 		return length;
 	}
 
+	/**
+	 * Returns the charset of bom in the file.
+	 *
+	 * @return the charset of bom in the file
+	 */
+	public String getBomCharsetName(){
+		return bomCharsetName;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index 82793adc137..7541d48bbe0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -59,7 +59,12 @@ public TextInputFormat(Path filePath) {
 	// --------------------------------------------------------------------------------------------
 
 	public String getCharsetName() {
-		return charsetName;
+		String bomCharsetName = getBomCharsetName();
+		if (bomCharsetName != null && !bomCharsetName.equals(charsetName)) {
+			return bomCharsetName;
+		} else {
+			return charsetName;
+		}
 	}
 
 	public void setCharsetName(String charsetName) {
@@ -85,14 +90,26 @@ public void configure(Configuration parameters) {
 
 	@Override
 	public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException {
+		String utf8 = "UTF-8";
+		String utf16 = "UTF-16";
+		String utf32 = "UTF-32";
+		int stepSize = 0;
+		String charsetName = this.getCharsetName();
+		if (charsetName.contains(utf8)) {
+			stepSize = 1;
+		} else if (charsetName.contains(utf16)) {
+			stepSize = 2;
+		} else if (charsetName.contains(utf32)) {
+			stepSize = 4;
+		}
 		//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
 		if (this.getDelimiter() != null && this.getDelimiter().length == 1
-				&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
-				&& bytes[offset + numBytes - 1] == CARRIAGE_RETURN){
-			numBytes -= 1;
+			&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= stepSize
+			&& bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) {
+			numBytes -= stepSize;
 		}
-
-		return new String(bytes, offset, numBytes, this.charsetName);
+		numBytes = numBytes - stepSize + 1;
+		return new String(bytes, offset, numBytes, this.getCharsetName());
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index e78232ac1e5..205dddb7639 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -55,7 +55,7 @@ public void testSimpleRead() {
 			tempFile.deleteOnExit();
 			tempFile.setWritable(true);
 
-			PrintStream ps = new  PrintStream(tempFile);
+			PrintStream ps = new PrintStream(tempFile);
 			ps.println(first);
 			ps.println(second);
 			ps.close();
@@ -83,8 +83,7 @@ public void testSimpleRead() {
 			assertEquals(second, result);
 
 			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
-		}
-		catch (Throwable t) {
+		} catch (Throwable t) {
 			System.err.println("test failed with exception: " + t.getMessage());
 			t.printStackTrace(System.err);
 			fail("Test erroneous");
@@ -93,11 +92,11 @@ public void testSimpleRead() {
 
 	@Test
 	public void testNestedFileRead() {
-		String[] dirs = new String[] {"tmp/first/", "tmp/second/"};
+		String[] dirs = new String[]{"tmp/first/", "tmp/second/"};
 		List<String> expectedFiles = new ArrayList<>();
 
 		try {
-			for (String dir: dirs) {
+			for (String dir : dirs) {
 				// create input file
 				File tmpDir = new File(dir);
 				if (!tmpDir.exists()) {
@@ -127,7 +126,7 @@ public void testNestedFileRead() {
 			FileInputSplit[] splits = inputFormat.createInputSplits(expectedFiles.size());
 
 			List<String> paths = new ArrayList<>();
-			for (FileInputSplit split: splits) {
+			for (FileInputSplit split : splits) {
 				paths.add(split.getPath().toString());
 			}
 
@@ -188,7 +187,7 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) {
 
 			String result = "";
 			if ((delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n")))
-					|| (lineBreaker.equals(delimiter))){
+				|| (lineBreaker.equals(delimiter))) {
 
 				result = inputFormat.nextRecord("");
 				assertNotNull("Expecting first record here", result);
@@ -207,8 +206,110 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) {
 				assertEquals(content, result);
 			}
 
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
+		}
+	}
+
+	@Test
+	public void testUTF16Read() {
+		final String first = "First line";
+		final String second = "Second line";
+
+		try {
+			// create input file
+			File tempFile = File.createTempFile("TextInputFormatTest", "tmp");
+			tempFile.deleteOnExit();
+			tempFile.setWritable(true);
+
+			PrintStream ps = new PrintStream(tempFile, "UTF-16");
+			ps.println(first);
+			ps.println(second);
+			ps.close();
+
+			TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
+			inputFormat.setCharsetName("UTF-32");
+
+			Configuration parameters = new Configuration();
+			inputFormat.configure(parameters);
+
+//			inputFormat.setDelimiter("\r");
+//			inputFormat.setDelimiter("i");
+
+			FileInputSplit[] splits = inputFormat.createInputSplits(1);
+			assertTrue("expected at least one input split", splits.length >= 1);
+			inputFormat.open(splits[0]);
+
+			String result = "";
+
+			System.out.println("bomCharsetName:" + inputFormat.getBomCharsetName());
+
+			assertFalse(inputFormat.reachedEnd());
+			result = inputFormat.nextRecord("");
+			System.out.println(result);
+			assertNotNull("Expecting first record here", result);
+			assertEquals(first, result.substring(1));
+
+			assertFalse(inputFormat.reachedEnd());
+			result = inputFormat.nextRecord(result);
+			System.out.println(result);
+			assertNotNull("Expecting second record here", result);
+			assertEquals(second, result);
+
+			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
 		}
-		catch (Throwable t) {
+	}
+
+	@Test
+	public void testUTF32Read() {
+		final String first = "First line";
+		final String second = "Second line";
+
+		try {
+			// create input file
+			File tempFile = File.createTempFile("TextInputFormatTest", "tmp");
+			tempFile.deleteOnExit();
+			tempFile.setWritable(true);
+
+			PrintStream ps = new PrintStream(tempFile, "UTF-32");
+			ps.println(first);
+			ps.println(second);
+			ps.close();
+
+			TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
+			inputFormat.setCharsetName("UTF-32");
+
+			Configuration parameters = new Configuration();
+			inputFormat.configure(parameters);
+
+			FileInputSplit[] splits = inputFormat.createInputSplits(1);
+			assertTrue("expected at least one input split", splits.length >= 1);
+			inputFormat.open(splits[0]);
+
+			String result = "";
+
+			System.out.println("bomCharsetName:" + inputFormat.getBomCharsetName());
+
+			assertFalse(inputFormat.reachedEnd());
+			result = inputFormat.nextRecord("");
+			System.out.println(result);
+			assertNotNull("Expecting first record here", result);
+			assertEquals(first, result);
+
+			assertFalse(inputFormat.reachedEnd());
+			result = inputFormat.nextRecord(result);
+			System.out.println(result);
+			assertNotNull("Expecting second record here", result);
+			assertEquals(second, result);
+
+			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
+		} catch (Throwable t) {
 			System.err.println("test failed with exception: " + t.getMessage());
 			t.printStackTrace(System.err);
 			fail("Test erroneous");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services