You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/21 10:36:49 UTC

[GitHub] XuQianJin-Stars closed pull request #7092: [FLINK-10134] UTF-16 support for TextInputFormat bug

XuQianJin-Stars closed pull request #7092: [FLINK-10134] UTF-16 support for TextInputFormat bug
URL: https://github.com/apache/flink/pull/7092
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index c1ef344175b..af99f0c9d54 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -28,6 +28,7 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.LRUCache;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,21 +37,22 @@
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Map;
 
 /**
  * Base implementation for input formats that split the input at a delimiter into records.
  * The parsing of the record bytes into the record has to be implemented in the
  * {@link #readRecord(Object, byte[], int, int)} method.
- * 
+ *
  * <p>The default delimiter is the newline character {@code '\n'}.</p>
  */
 @Public
 public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements CheckpointableInputFormat<FileInputSplit, Long> {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	// -------------------------------------- Constants -------------------------------------------
-	
+
 	/**
 	 * The log.
 	 */
@@ -62,26 +64,46 @@
 	// Charset is not serializable
 	private transient Charset charset;
 
+	/**
+	 * The charset of bom in the file to process.
+	 */
+	private transient Charset bomIdentifiedCharset;
+
+	/**
+	 * This is the charset that is configured via setCharset().
+	 */
+	private transient Charset configuredCharset;
+
+	/**
+	 * The Map to record the BOM encoding of all files.
+	 */
+	private transient final Map<String, Charset> fileBomCharsetMap;
+
+	/**
+	 * The bytes to BOM check.
+	 */
+	byte[] bomBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+
 	/**
 	 * The default read buffer size = 1MB.
 	 */
 	private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
-	
+
 	/**
 	 * Indication that the number of samples has not been set by the configuration.
 	 */
 	private static final int NUM_SAMPLES_UNDEFINED = -1;
-	
+
 	/**
 	 * The maximum number of line samples to be taken.
 	 */
 	private static int DEFAULT_MAX_NUM_SAMPLES;
-	
+
 	/**
 	 * The minimum number of line samples to be taken.
 	 */
 	private static int DEFAULT_MIN_NUM_SAMPLES;
-	
+
 	/**
 	 * The maximum size of a sample record before sampling is aborted. To catch cases where a wrong delimiter is given.
 	 */
@@ -98,7 +120,7 @@ protected static void loadGlobalConfigParams() {
 	protected static void loadConfigParameters(Configuration parameters) {
 		int maxSamples = parameters.getInteger(OptimizerOptions.DELIMITED_FORMAT_MAX_LINE_SAMPLES);
 		int minSamples = parameters.getInteger(OptimizerOptions.DELIMITED_FORMAT_MIN_LINE_SAMPLES);
-		
+
 		if (maxSamples < 0) {
 			LOG.error("Invalid default maximum number of line samples: " + maxSamples + ". Using default value of " +
 				OptimizerOptions.DELIMITED_FORMAT_MAX_LINE_SAMPLES.key());
@@ -109,17 +131,17 @@ protected static void loadConfigParameters(Configuration parameters) {
 				OptimizerOptions.DELIMITED_FORMAT_MIN_LINE_SAMPLES.key());
 			minSamples = OptimizerOptions.DELIMITED_FORMAT_MIN_LINE_SAMPLES.defaultValue();
 		}
-		
+
 		DEFAULT_MAX_NUM_SAMPLES = maxSamples;
-		
+
 		if (minSamples > maxSamples) {
 			LOG.error("Default minimum number of line samples cannot be greater the default maximum number " +
-					"of line samples: min=" + minSamples + ", max=" + maxSamples + ". Defaulting minimum to maximum.");
+				"of line samples: min=" + minSamples + ", max=" + maxSamples + ". Defaulting minimum to maximum.");
 			DEFAULT_MIN_NUM_SAMPLES = maxSamples;
 		} else {
 			DEFAULT_MIN_NUM_SAMPLES = minSamples;
 		}
-		
+
 		int maxLen = parameters.getInteger(OptimizerOptions.DELIMITED_FORMAT_MAX_SAMPLE_LEN);
 		if (maxLen <= 0) {
 			maxLen = OptimizerOptions.DELIMITED_FORMAT_MAX_SAMPLE_LEN.defaultValue();
@@ -130,12 +152,12 @@ protected static void loadConfigParameters(Configuration parameters) {
 		}
 		MAX_SAMPLE_LEN = maxLen;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Variables for internal parsing.
-	//  They are all transient, because we do not want them so be serialized 
+	//  They are all transient, because we do not want them so be serialized
 	// --------------------------------------------------------------------------------------------
-	
+
 	private transient byte[] readBuffer;
 
 	private transient byte[] wrapBuffer;
@@ -164,12 +186,12 @@ protected static void loadConfigParameters(Configuration parameters) {
 	private String delimiterString = null;
 
 	private int lineLengthLimit = Integer.MAX_VALUE;
-	
+
 	private int bufferSize = -1;
-	
+
 	private int numLineSamples = NUM_SAMPLES_UNDEFINED;
-	
-	
+
+
 	// --------------------------------------------------------------------------------------------
 	//  Constructors & Getters/setters for the configurable parameters
 	// --------------------------------------------------------------------------------------------
@@ -184,6 +206,7 @@ protected DelimitedInputFormat(Path filePath, Configuration configuration) {
 			configuration = GlobalConfiguration.loadConfiguration();
 		}
 		loadConfigParameters(configuration);
+		this.fileBomCharsetMap = new LRUCache<>(1024);
 	}
 
 	/**
@@ -195,7 +218,11 @@ protected DelimitedInputFormat(Path filePath, Configuration configuration) {
 	 */
 	@PublicEvolving
 	public Charset getCharset() {
-		if (this.charset == null) {
+		if (this.configuredCharset != null) {
+			this.charset = this.configuredCharset;
+		} else if (this.bomIdentifiedCharset != null) {
+			this.charset = this.bomIdentifiedCharset;
+		} else {
 			this.charset = Charset.forName(charsetName);
 		}
 		return this.charset;
@@ -214,7 +241,7 @@ public Charset getCharset() {
 	@PublicEvolving
 	public void setCharset(String charset) {
 		this.charsetName = Preconditions.checkNotNull(charset);
-		this.charset = null;
+		this.configuredCharset = getSpecialCharset(charset);
 
 		if (this.delimiterString != null) {
 			this.delimiter = delimiterString.getBytes(getCharset());
@@ -224,7 +251,7 @@ public void setCharset(String charset) {
 	public byte[] getDelimiter() {
 		return delimiter;
 	}
-	
+
 	public void setDelimiter(byte[] delimiter) {
 		if (delimiter == null) {
 			throw new IllegalArgumentException("Delimiter must not be null");
@@ -236,7 +263,7 @@ public void setDelimiter(byte[] delimiter) {
 	public void setDelimiter(char delimiter) {
 		setDelimiter(String.valueOf(delimiter));
 	}
-	
+
 	public void setDelimiter(String delimiter) {
 		if (delimiter == null) {
 			throw new IllegalArgumentException("Delimiter must not be null");
@@ -244,11 +271,11 @@ public void setDelimiter(String delimiter) {
 		this.delimiter = delimiter.getBytes(getCharset());
 		this.delimiterString = delimiter;
 	}
-	
+
 	public int getLineLengthLimit() {
 		return lineLengthLimit;
 	}
-	
+
 	public void setLineLengthLimit(int lineLengthLimit) {
 		if (lineLengthLimit < 1) {
 			throw new IllegalArgumentException("Line length limit must be at least 1.");
@@ -256,11 +283,11 @@ public void setLineLengthLimit(int lineLengthLimit) {
 
 		this.lineLengthLimit = lineLengthLimit;
 	}
-	
+
 	public int getBufferSize() {
 		return bufferSize;
 	}
-	
+
 	public void setBufferSize(int bufferSize) {
 		if (bufferSize < 2) {
 			throw new IllegalArgumentException("Buffer size must be at least 2.");
@@ -268,11 +295,11 @@ public void setBufferSize(int bufferSize) {
 
 		this.bufferSize = bufferSize;
 	}
-	
+
 	public int getNumLineSamples() {
 		return numLineSamples;
 	}
-	
+
 	public void setNumLineSamples(int numLineSamples) {
 		if (numLineSamples < 0) {
 			throw new IllegalArgumentException("Number of line samples must not be negative.");
@@ -287,25 +314,25 @@ public void setNumLineSamples(int numLineSamples) {
 	/**
 	 * This function parses the given byte array which represents a serialized record.
 	 * The function returns a valid record or throws an IOException.
-	 * 
+	 *
 	 * @param reuse An optionally reusable object.
 	 * @param bytes Binary data of serialized records.
-	 * @param offset The offset where to start to read the record data. 
+	 * @param offset The offset where to start to read the record data.
 	 * @param numBytes The number of bytes that can be read starting at the offset position.
-	 * 
+	 *
 	 * @return Returns the read record if it was successfully deserialized.
 	 * @throws IOException if the record could not be read.
 	 */
 	public abstract OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException;
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Pre-flight: Configuration, Splits, Sampling
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Configures this input format by reading the path to the file from the configuration and the string that
 	 * defines the record delimiter.
-	 * 
+	 *
 	 * @param parameters The configuration object to read the parameters from.
 	 */
 	@Override
@@ -321,7 +348,7 @@ public void configure(Configuration parameters) {
 				setDelimiter(delimString);
 			}
 		}
-		
+
 		// set the number of samples
 		if (numLineSamples == NUM_SAMPLES_UNDEFINED) {
 			String samplesString = parameters.getString(NUM_STATISTICS_SAMPLES, null);
@@ -337,13 +364,13 @@ public void configure(Configuration parameters) {
 			}
 		}
 	}
-	
+
 	@Override
 	public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		
+
 		final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ?
-				(FileBaseStatistics) cachedStats : null;
-		
+			(FileBaseStatistics) cachedStats : null;
+
 		// store properties
 		final long oldTimeout = this.openTimeout;
 		final int oldBufferSize = this.bufferSize;
@@ -357,11 +384,11 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
 			if (stats == null) {
 				return null;
 			}
-			
+
 			// check whether the width per record is already known or the total size is unknown as well
 			// in both cases, we return the stats as they are
 			if (stats.getAverageRecordWidth() != FileBaseStatistics.AVG_RECORD_BYTES_UNKNOWN ||
-					stats.getTotalInputSize() == FileBaseStatistics.SIZE_UNKNOWN) {
+				stats.getTotalInputSize() == FileBaseStatistics.SIZE_UNKNOWN) {
 				return stats;
 			}
 
@@ -370,7 +397,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
 			if (unsplittable) {
 				return stats;
 			}
-			
+
 			// compute how many samples to take, depending on the defined upper and lower bound
 			final int numSamples;
 			if (this.numLineSamples != NUM_SAMPLES_UNDEFINED) {
@@ -380,7 +407,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
 				final int calcSamples = (int) (stats.getTotalInputSize() / 1024);
 				numSamples = Math.min(DEFAULT_MAX_NUM_SAMPLES, Math.max(DEFAULT_MIN_NUM_SAMPLES, calcSamples));
 			}
-			
+
 			// check if sampling is disabled.
 			if (numSamples == 0) {
 				return stats;
@@ -388,15 +415,15 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
 			if (numSamples < 0) {
 				throw new RuntimeException("Error: Invalid number of samples: " + numSamples);
 			}
-			
-			
+
+
 			// make sure that the sampling times out after a while if the file system does not answer in time
 			this.openTimeout = 10000;
 			// set a small read buffer size
 			this.bufferSize = 4 * 1024;
 			// prevent overly large records, for example if we have an incorrectly configured delimiter
 			this.lineLengthLimit = MAX_SAMPLE_LEN;
-			
+
 			long offset = 0;
 			long totalNumBytes = 0;
 			long stepSize = stats.getTotalInputSize() / numSamples;
@@ -430,21 +457,21 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
 					fileNum++;
 				}
 			}
-			
+
 			// we have the width, store it
 			return new FileBaseStatistics(stats.getLastModificationTime(),
 				stats.getTotalInputSize(), totalNumBytes / (float) samplesTaken);
-			
+
 		} catch (IOException ioex) {
 			if (LOG.isWarnEnabled()) {
 				LOG.warn("Could not determine statistics for files '" + Arrays.toString(getFilePaths()) + "' " +
-						 "due to an io error: " + ioex.getMessage());
+					"due to an io error: " + ioex.getMessage());
 			}
 		}
 		catch (Throwable t) {
 			if (LOG.isErrorEnabled()) {
 				LOG.error("Unexpected problem while getting the file statistics for files '" + Arrays.toString(getFilePaths()) + "': "
-						+ t.getMessage(), t);
+					+ t.getMessage(), t);
 			}
 		} finally {
 			// restore properties (even on return)
@@ -452,7 +479,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
 			this.bufferSize = oldBufferSize;
 			this.lineLengthLimit = oldLineLengthLimit;
 		}
-		
+
 		// no statistics possible
 		return null;
 	}
@@ -472,6 +499,7 @@ public void open(FileInputSplit split) throws IOException {
 
 		this.offset = splitStart;
 		if (this.splitStart != 0) {
+			setBomFileCharset(split);
 			this.stream.seek(offset);
 			readLine();
 			// if the first partial record already pushes the stream over
@@ -481,7 +509,9 @@ public void open(FileInputSplit split) throws IOException {
 			}
 		} else {
 			fillBuffer(0);
+			setBomFileCharset(split);
 		}
+
 	}
 
 	private void initBuffers() {
@@ -504,16 +534,85 @@ private void initBuffers() {
 		this.end = false;
 	}
 
+	/**
+	 * Special default processing for utf-16 and utf-32 is performed.
+	 *
+	 * @param charsetName
+	 * @return
+	 */
+	private Charset getSpecialCharset(String charsetName) {
+		Charset charset;
+		switch (charsetName.toUpperCase()) {
+			case "UTF-16":
+				charset = Charset.forName("UTF-16BE");
+				break;
+			case "UTF-32":
+				charset = Charset.forName("UTF-32BE");
+				break;
+			default:
+				charset = Charset.forName(charsetName);
+				break;
+		}
+		return charset;
+	}
+
+	/**
+	 * Set file bom encoding.
+	 *
+	 * @param split
+	 */
+	private void setBomFileCharset(FileInputSplit split) {
+		try {
+			if (configuredCharset == null) {
+				String filePath = split.getPath().toString();
+				if (this.fileBomCharsetMap.containsKey(filePath)) {
+					this.bomIdentifiedCharset = this.fileBomCharsetMap.get(filePath);
+				} else {
+					byte[] bomBuffer = new byte[4];
+
+					if (this.splitStart != 0) {
+						this.stream.seek(0);
+						this.stream.read(bomBuffer, 0, bomBuffer.length);
+						this.stream.seek(split.getStart());
+					} else {
+						System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 4);
+					}
+
+					if ((bomBuffer[0] == bomBytes[0]) && (bomBuffer[1] == bomBytes[0]) && (bomBuffer[2] == bomBytes[1])
+						&& (bomBuffer[3] == bomBytes[2])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-32BE");
+					} else if ((bomBuffer[0] == bomBytes[2]) && (bomBuffer[1] == bomBytes[1]) && (bomBuffer[2] == bomBytes[0])
+						&& (bomBuffer[3] == bomBytes[0])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-32LE");
+					} else if ((bomBuffer[0] == bomBytes[3]) && (bomBuffer[1] == bomBytes[4]) && (bomBuffer[2] == bomBytes[5])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-8");
+					} else if ((bomBuffer[0] == bomBytes[1]) && (bomBuffer[1] == bomBytes[2])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-16BE");
+					} else if ((bomBuffer[0] == bomBytes[2]) && (bomBuffer[1] == bomBytes[1])) {
+						this.bomIdentifiedCharset = Charset.forName("UTF-16LE");
+					} else {
+						this.bomIdentifiedCharset = Charset.forName(charsetName);
+					}
+
+					this.fileBomCharsetMap.put(filePath, this.bomIdentifiedCharset);
+				}
+			}
+		} catch (Exception e) {
+			LOG.warn("Failed to get file bom encoding.");
+			this.bomIdentifiedCharset = Charset.forName(charsetName);
+		}
+	}
+
 	/**
 	 * Checks whether the current split is at its end.
-	 * 
+	 *
 	 * @return True, if the split is at its end, false otherwise.
 	 */
 	@Override
 	public boolean reachedEnd() {
 		return this.end;
 	}
-	
+
 	@Override
 	public OT nextRecord(OT record) throws IOException {
 		if (readLine()) {
@@ -526,7 +625,7 @@ public OT nextRecord(OT record) throws IOException {
 
 	/**
 	 * Closes the input by releasing all buffers and closing the file input stream.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the closing of the file stream causes an I/O error.
 	 */
 	@Override
@@ -623,11 +722,11 @@ protected final boolean readLine() throws IOException {
 			} else {
 				// we reached the end of the readBuffer
 				count = this.limit - startPos;
-				
+
 				// check against the maximum record length
 				if (((long) countInWrapBuffer) + count > this.lineLengthLimit) {
-					throw new IOException("The record length exceeded the maximum record length (" + 
-							this.lineLengthLimit + ").");
+					throw new IOException("The record length exceeded the maximum record length (" +
+						this.lineLengthLimit + ").");
 				}
 
 				// Compute number of bytes to move to wrapBuffer
@@ -650,7 +749,7 @@ protected final boolean readLine() throws IOException {
 			}
 		}
 	}
-	
+
 	private void setResult(byte[] buffer, int offset, int len) {
 		this.currBuffer = buffer;
 		this.currOffset = offset;
@@ -675,7 +774,7 @@ private boolean fillBuffer(int offset) throws IOException {
 				return true;
 			}
 		}
-		
+
 		// else ..
 		int toRead;
 		if (this.splitLength > 0) {
@@ -714,7 +813,7 @@ private boolean fillBuffer(int offset) throws IOException {
 	 * The configuration key to set the record delimiter.
 	 */
 	protected static final String RECORD_DELIMITER = "delimited-format.delimiter";
-	
+
 	/**
 	 * The configuration key to set the number of samples to take for the statistics.
 	 */
diff --git a/flink-core/src/main/java/org/apache/flink/util/LRUCache.java b/flink-core/src/main/java/org/apache/flink/util/LRUCache.java
new file mode 100644
index 00000000000..ee7aa2516ca
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/LRUCache.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A LRUCache by LinkedHashMap.
+ */
+public class LRUCache<K, V> extends LinkedHashMap<K, V> implements java.io.Serializable {
+
+	private final int maxCacheSize;
+
+	public LRUCache(int cacheSize) {
+		super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true);
+		maxCacheSize = cacheSize;
+	}
+
+	@Override
+	protected boolean removeEldestEntry(Map.Entry eldest) {
+		return size() > maxCacheSize;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		for (Map.Entry<K, V> entry : entrySet()) {
+			sb.append(String.format("%s:%s ", entry.getKey(), entry.getValue()));
+		}
+		return sb.toString();
+	}
+}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index 82793adc137..25a1c86e44c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -21,6 +21,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 
 import java.io.IOException;
@@ -58,20 +59,34 @@ public TextInputFormat(Path filePath) {
 
 	// --------------------------------------------------------------------------------------------
 
-	public String getCharsetName() {
-		return charsetName;
-	}
-
 	public void setCharsetName(String charsetName) {
 		if (charsetName == null) {
 			throw new IllegalArgumentException("Charset must not be null.");
 		}
 
 		this.charsetName = charsetName;
+		this.setCharset(charsetName);
+	}
+
+	/**
+	 * Processing for Delimiter special cases.
+	 */
+	private void setSpecialDelimiter() {
+		String delimiterString = "\n";
+		if (this.getDelimiter() != null && this.getDelimiter().length == 1
+			&& this.getDelimiter()[0] == (byte) '\n') {
+			this.setDelimiter(delimiterString);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
 
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		setSpecialDelimiter();
+	}
+
 	@Override
 	public void configure(Configuration parameters) {
 		super.configure(parameters);
@@ -87,12 +102,12 @@ public void configure(Configuration parameters) {
 	public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException {
 		//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
 		if (this.getDelimiter() != null && this.getDelimiter().length == 1
-				&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
-				&& bytes[offset + numBytes - 1] == CARRIAGE_RETURN){
+			&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
+			&& bytes[offset + numBytes - 1] == CARRIAGE_RETURN){
 			numBytes -= 1;
 		}
 
-		return new String(bytes, offset, numBytes, this.charsetName);
+		return new String(bytes, offset, numBytes, this.getCharset());
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index e78232ac1e5..6ca0945d1cd 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -25,16 +25,20 @@
 
 import org.junit.Test;
 
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -55,7 +59,7 @@ public void testSimpleRead() {
 			tempFile.deleteOnExit();
 			tempFile.setWritable(true);
 
-			PrintStream ps = new  PrintStream(tempFile);
+			PrintStream ps = new PrintStream(tempFile);
 			ps.println(first);
 			ps.println(second);
 			ps.close();
@@ -83,8 +87,7 @@ public void testSimpleRead() {
 			assertEquals(second, result);
 
 			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
-		}
-		catch (Throwable t) {
+		} catch (Throwable t) {
 			System.err.println("test failed with exception: " + t.getMessage());
 			t.printStackTrace(System.err);
 			fail("Test erroneous");
@@ -93,11 +96,11 @@ public void testSimpleRead() {
 
 	@Test
 	public void testNestedFileRead() {
-		String[] dirs = new String[] {"tmp/first/", "tmp/second/"};
+		String[] dirs = new String[]{"tmp/first/", "tmp/second/"};
 		List<String> expectedFiles = new ArrayList<>();
 
 		try {
-			for (String dir: dirs) {
+			for (String dir : dirs) {
 				// create input file
 				File tmpDir = new File(dir);
 				if (!tmpDir.exists()) {
@@ -127,7 +130,7 @@ public void testNestedFileRead() {
 			FileInputSplit[] splits = inputFormat.createInputSplits(expectedFiles.size());
 
 			List<String> paths = new ArrayList<>();
-			for (FileInputSplit split: splits) {
+			for (FileInputSplit split : splits) {
 				paths.add(split.getPath().toString());
 			}
 
@@ -188,7 +191,7 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) {
 
 			String result = "";
 			if ((delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n")))
-					|| (lineBreaker.equals(delimiter))){
+				|| (lineBreaker.equals(delimiter))) {
 
 				result = inputFormat.nextRecord("");
 				assertNotNull("Expecting first record here", result);
@@ -207,12 +210,250 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) {
 				assertEquals(content, result);
 			}
 
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
 		}
-		catch (Throwable t) {
+	}
+
+	/**
+	 * Test different file encodings,for example:UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE.
+	 */
+	@Test
+	public void testFileCharset() {
+		String data = "Hello|ハロー|при\\вет|Bon^*|\\|<>|jour|Сайн. байна уу|안녕*하세요.";
+		// Default separator
+		testAllFileCharset(data);
+		// Specified separator
+		testAllFileCharset(data, "^*|\\|<>|");
+	}
+
+	private void testAllFileCharset(String data) {
+		testAllFileCharset(data, "");
+	}
+
+	private void testAllFileCharset(String data, String delimiter) {
+		try {
+			// test UTF-8, no bom, UTF-8
+			testFileCharset(data, "UTF-8", false, "UTF-8", delimiter);
+			// test UTF-8, have bom, UTF-8
+			testFileCharset(data, "UTF-8", true, "UTF-8", delimiter);
+			// test UTF-16BE, no, UTF-16
+			testFileCharset(data, "UTF-16BE", false, "UTF-16", delimiter);
+			// test UTF-16BE, yes, UTF-16
+			testFileCharset(data, "UTF-16BE", true, "UTF-16", delimiter);
+			// test UTF-16LE, no, UTF-16LE
+			testFileCharset(data, "UTF-16LE", false, "UTF-16LE", delimiter);
+			// test UTF-16LE, yes, UTF-16
+			testFileCharset(data, "UTF-16LE", true, "UTF-16", delimiter);
+			// test UTF-16BE, no, UTF-16BE
+			testFileCharset(data, "UTF-16BE", false, "UTF-16BE", delimiter);
+			// test UTF-16BE, yes, UTF-16LE
+			testFileCharset(data, "UTF-16BE", true, "UTF-16LE", delimiter);
+			// test UTF-16LE, yes, UTF-16BE
+			testFileCharset(data, "UTF-16LE", true, "UTF-16BE", delimiter);
+			// test UTF-32BE, no, UTF-32
+			testFileCharset(data, "UTF-32BE", false, "UTF-32", delimiter);
+			// test UTF-32BE, yes, UTF-32
+			testFileCharset(data, "UTF-32BE", true, "UTF-32", delimiter);
+			// test UTF-32LE, yes, UTF-32
+			testFileCharset(data, "UTF-32LE", true, "UTF-32", delimiter);
+			// test UTF-32LE, no, UTF-32LE
+			testFileCharset(data, "UTF-32LE", false, "UTF-32LE", delimiter);
+			// test UTF-32BE, no, UTF-32BE
+			testFileCharset(data, "UTF-32BE", false, "UTF-32BE", delimiter);
+			// test UTF-32BE, yes, UTF-32LE
+			testFileCharset(data, "UTF-32BE", true, "UTF-32LE", delimiter);
+			// test UTF-32LE, yes, UTF-32BE
+			testFileCharset(data, "UTF-32LE", true, "UTF-32BE", delimiter);
+			//------------------Don't set the charset------------------------
+			// test UTF-8, have bom, Don't set the charset
+			testFileCharset(data, "UTF-8", true, delimiter);
+			// test UTF-8, no bom, Don't set the charset
+			testFileCharset(data, "UTF-8", false, delimiter);
+			// test UTF-16BE, no bom, Don't set the charset
+			testFileCharset(data, "UTF-16BE", false, delimiter);
+			// test UTF-16BE, have bom, Don't set the charset
+			testFileCharset(data, "UTF-16BE", true, delimiter);
+			// test UTF-16LE, have bom, Don't set the charset
+			testFileCharset(data, "UTF-16LE", true, delimiter);
+			// test UTF-32BE, no bom, Don't set the charset
+			testFileCharset(data, "UTF-32BE", false, delimiter);
+			// test UTF-32BE, have bom, Don't set the charset
+			testFileCharset(data, "UTF-32BE", true, delimiter);
+			// test UTF-32LE, have bom, Don't set the charset
+			testFileCharset(data, "UTF-32LE", true, delimiter);
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
+		}
+	}
+
+	/**
+	 * To create UTF EncodedFile.
+	 *
+	 * @param data
+	 * @param fileCharset
+	 * @param hasBom
+	 * @return
+	 */
+	private File createUTFEncodedFile(String data, String fileCharset, boolean hasBom) throws Exception {
+		BufferedWriter bw = null;
+		OutputStreamWriter osw = null;
+		FileOutputStream fos = null;
+
+		byte[] bom = new byte[]{};
+		if (hasBom) {
+			switch (fileCharset) {
+				case "UTF-8":
+					bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+					break;
+				case "UTF-16":
+					bom = new byte[]{(byte) 0xFE, (byte) 0xFF};
+					break;
+				case "UTF-16LE":
+					bom = new byte[]{(byte) 0xFF, (byte) 0xFE};
+					break;
+				case "UTF-16BE":
+					bom = new byte[]{(byte) 0xFE, (byte) 0xFF};
+					break;
+				case "UTF-32":
+					bom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF};
+					break;
+				case "UTF-32LE":
+					bom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00};
+					break;
+				case "UTF-32BE":
+					bom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF};
+					break;
+				default:
+					throw new Exception("can not find the utf code");
+			}
+		}
+
+		// create input file
+		File tempFile = File.createTempFile("TextInputFormatTest", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+		fos = new FileOutputStream(tempFile, true);
+
+		if (tempFile.length() < 1) {
+			if (hasBom) {
+				fos.write(bom);
+			}
+		}
+
+		osw = new OutputStreamWriter(fos, fileCharset);
+		bw = new BufferedWriter(osw);
+		bw.write(data);
+		bw.newLine();
+
+		bw.close();
+		fos.close();
+
+		return tempFile;
+	}
+
+	private void testFileCharset(String data, String fileCharset, Boolean hasBom, String delimiter) {
+		testFileCharset(data, fileCharset, hasBom, null, delimiter);
+	}
+
+	private void testFileCharset(String data, String fileCharset, Boolean hasBom, String specifiedCharset, String delimiter) {
+		try {
+			int offset = 0;
+			String carriageReturn = java.security.AccessController.doPrivileged(
+				new sun.security.action.GetPropertyAction("line.separator"));
+			String delimiterString = delimiter.isEmpty() ? carriageReturn : delimiter;
+			byte[] delimiterBytes = delimiterString.getBytes(fileCharset);
+			String[] utfArray = {"UTF-8", "UTF-16", "UTF-16LE", "UTF-16BE"};
+			if (hasBom) {
+				if (Arrays.asList(utfArray).contains(fileCharset)) {
+					offset = 1;
+				}
+			}
+
+			File tempFile = createUTFEncodedFile(data, fileCharset, hasBom);
+
+			TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
+			if (specifiedCharset != null) {
+				inputFormat.setCharsetName(specifiedCharset);
+			}
+			if (delimiterBytes.length > 0) {
+				inputFormat.setDelimiter(delimiterBytes);
+			}
+
+			Configuration parameters = new Configuration();
+			inputFormat.configure(parameters);
+
+			FileInputSplit[] splits = inputFormat.createInputSplits(1);
+			assertTrue("expected at least one input split", splits.length >= 1);
+			inputFormat.open(splits[0]);
+
+			String result = "";
+			int i = 0;
+			data = data + carriageReturn;
+			String delimiterStr = new String(delimiterBytes, 0, delimiterBytes.length, fileCharset);
+			String[] strArr = data.split(delimiterStr
+				.replace("\\", "\\\\")
+				.replace("^", "\\^")
+				.replace("|", "\\|")
+				.replace("[", "\\[")
+				.replace("*", "\\*")
+				.replace(".", "\\.")
+			);
+
+			while (!inputFormat.reachedEnd()) {
+				if (i < strArr.length) {
+					result = inputFormat.nextRecord("");
+					if (i == 0) {
+						result = result.substring(offset);
+					}
+					if (Charset.forName(fileCharset) != inputFormat.getCharset()) {
+						assertNotEquals(strArr[i], result);
+					} else {
+						assertEquals(strArr[i], result);
+					}
+					i++;
+				} else {
+					inputFormat.nextRecord("");
+				}
+			}
+			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
+		} catch (Throwable t) {
 			System.err.println("test failed with exception: " + t.getMessage());
 			t.printStackTrace(System.err);
 			fail("Test erroneous");
 		}
 	}
 
+	@Test
+	public void testFileCharsetReadByMultiSplits() {
+		String carriageReturn = java.security.AccessController.doPrivileged(
+			new sun.security.action.GetPropertyAction("line.separator"));
+		final String data = "First line" + carriageReturn + "Second line";
+		try {
+			File tempFile = createUTFEncodedFile(data, "UTF-16", false);
+
+			TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
+			inputFormat.setCharsetName("UTF-32");
+
+			Configuration parameters = new Configuration();
+			inputFormat.configure(parameters);
+
+			FileInputSplit[] splits = inputFormat.createInputSplits(3);
+			assertTrue("expected at least one input split", splits.length >= 1);
+			String result = "";
+			for (FileInputSplit split : splits) {
+				inputFormat.open(split);
+				result = inputFormat.nextRecord("");
+			}
+			assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
+		} catch (Throwable t) {
+			System.err.println("test failed with exception: " + t.getMessage());
+			t.printStackTrace(System.err);
+			fail("Test erroneous");
+		}
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services