You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/08 20:23:23 UTC

[1/4] flink git commit: [FLINK-5226] [table] Use correct DataSetCostFactory and improve DataSetCalc costs.

Repository: flink
Updated Branches:
  refs/heads/master 55d60615a -> 41d5875bf


[FLINK-5226] [table] Use correct DataSetCostFactory and improve DataSetCalc costs.

- Improved DataSetCalc costs make projections cheap and help to push them down.

This closes #2926.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677d0d90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677d0d90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677d0d90

Branch: refs/heads/master
Commit: 677d0d9073952b6f4c745ac242ba4108364f2189
Parents: 55d6061
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Dec 2 15:28:16 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 18:45:42 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/FlinkRelBuilder.scala       |  2 +-
 .../table/plan/nodes/dataset/DataSetCalc.scala  | 14 ++++++--
 .../table/plan/nodes/dataset/DataSetJoin.scala  | 17 ++++++----
 .../api/scala/batch/sql/SetOperatorsTest.scala  | 14 ++++++--
 .../api/scala/batch/sql/SingleRowJoinTest.scala | 34 ++++++++++++--------
 5 files changed, 55 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index da44ebb..8508e53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -79,7 +79,7 @@ object FlinkRelBuilder {
     val typeFactory = new FlinkTypeFactory(typeSystem)
 
     // create context instances with Flink type factory
-    val planner = new VolcanoPlanner(Contexts.empty())
+    val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
     planner.setExecutor(config.getExecutor)
     planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
     val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))

http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index b8b74ad..c0881b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -32,6 +32,8 @@ import TypeConverter._
 import org.apache.flink.api.table.BatchTableEnvironment
 import org.apache.calcite.rex._
 
+import scala.collection.JavaConverters._
+
 /**
   * Flink RelNode which matches along with LogicalCalc.
   *
@@ -73,8 +75,16 @@ class DataSetCalc(
 
     val child = this.getInput
     val rowCnt = metadata.getRowCount(child)
-    val exprCnt = calcProgram.getExprCount
-    planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0)
+
+    // compute number of expressions that do not access a field or literal, i.e. computations,
+    //   conditions, etc. We only want to account for computations, not for simple projections.
+    val compCnt = calcProgram.getExprList.asScala.toList.count {
+      case i: RexInputRef => false
+      case l: RexLiteral => false
+      case _ => true
+    }
+
+    planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
   }
 
   override def estimateRowCount(metadata: RelMetadataQuery): Double = {

http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 6d7a30e..ccd84ca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -88,12 +88,17 @@ class DataSetJoin(
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
 
-    val children = this.getInputs
-    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
-      val rowCnt = metadata.getRowCount(child)
-      val rowSize = this.estimateRowSize(child.getRowType)
-      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
-    }
+    val leftRowCnt = metadata.getRowCount(getLeft)
+    val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+    val rightRowCnt = metadata.getRowCount(getRight)
+    val rightRowSize = estimateRowSize(getRight.getRowType)
+
+    val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+    val cpuCost = leftRowCnt + rightRowCnt
+    val rowCnt = leftRowCnt + rightRowCnt
+
+    planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
   }
 
   override def translateToPlan(

http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
index 7b2b497..d0c0400 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -43,15 +43,23 @@ class SetOperatorsTest extends TableTestBase {
             "DataSetCalc",
             binaryNode(
               "DataSetJoin",
-              batchTableNode(1),
+              unaryNode(
+                "DataSetCalc",
+                batchTableNode(1),
+                term("select", "b_long")
+              ),
               unaryNode(
                 "DataSetAggregate",
-                batchTableNode(0),
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a_long")
+                ),
                 term("groupBy", "a_long"),
                 term("select", "a_long")
               ),
               term("where", "=(a_long, b_long)"),
-              term("join", "b_long", "b_int", "b_string", "a_long"),
+              term("join", "b_long", "a_long"),
               term("joinType", "InnerJoin")
             ),
             term("select", "true AS $f0", "a_long")

http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
index f56b9ae..49f61af 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
@@ -48,20 +48,23 @@ class SingleRowJoinTest extends TableTestBase {
               "DataSetUnion",
               unaryNode(
                 "DataSetValues",
-                batchTableNode(0),
-                tuples(List(null, null)),
-                term("values", "a1", "a2")
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)),
+                term("values", "a1")
               ),
-              term("union","a1","a2")
+              term("union","a1")
             ),
             term("select", "COUNT(a1) AS cnt")
           ),
-          term("where", "true"),
+          term("where", "=(CAST(a1), cnt)"),
           term("join", "a1", "a2", "cnt"),
           term("joinType", "NestedLoopJoin")
         ),
-        term("select", "a1", "a2"),
-        term("where", "=(CAST(a1), cnt)")
+        term("select", "a1", "a2")
       )
 
     util.verifySql(query, expected)
@@ -89,20 +92,23 @@ class SingleRowJoinTest extends TableTestBase {
               "DataSetUnion",
               unaryNode(
                 "DataSetValues",
-                batchTableNode(0),
-                tuples(List(null, null)),
-                term("values", "a1", "a2")
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "a1")
+                ),
+                tuples(List(null)),
+                term("values", "a1")
               ),
-              term("union","a1","a2")
+              term("union", "a1")
             ),
             term("select", "COUNT(a1) AS cnt")
           ),
-          term("where", "true"),
+          term("where", "<(a1, cnt)"),
           term("join", "a1", "a2", "cnt"),
           term("joinType", "NestedLoopJoin")
         ),
-        term("select", "a1", "a2"),
-        term("where", "<(a1, cnt)")
+        term("select", "a1", "a2")
       )
 
     util.verifySql(query, expected)


[3/4] flink git commit: [FLINK-3921] Add support to set encoding in CsvReader and StringParser.

Posted by fh...@apache.org.
[FLINK-3921] Add support to set encoding in CsvReader and StringParser.

This closes #2060.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186af6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186af6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186af6

Branch: refs/heads/master
Commit: f2186af6702c9fe48c91d5c2d7748378984cd29b
Parents: 2d8f03e
Author: Joshi <re...@gmail.com>
Authored: Wed Jun 1 13:26:47 2016 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 18:47:36 2016 +0100

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    | 64 ++++++++++----------
 .../apache/flink/types/parser/FieldParser.java  | 38 +++++++++---
 .../apache/flink/types/parser/StringParser.java |  8 +--
 .../common/io/GenericCsvInputFormatTest.java    | 40 ++++++++++--
 .../types/parser/VarLengthStringParserTest.java | 20 ++++++
 .../org/apache/flink/api/java/io/CsvReader.java | 24 +++++++-
 .../apache/flink/api/java/io/CSVReaderTest.java |  9 +++
 7 files changed, 155 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 85d9cd8..0ced22b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -25,14 +25,12 @@ import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
 import org.apache.flink.types.parser.StringValueParser;
 import org.apache.flink.util.InstantiationUtil;
-
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.TreeMap;
@@ -48,9 +46,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	
 	private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
 
-	/** The default charset  to convert strings to bytes */
-	private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-	
+	/** The charset used to convert strings to bytes */
+	private Charset charset = Charset.forName("UTF-8");
+
 	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
 	
 	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -107,6 +105,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 		super(filePath, null);
 	}
 
+	protected GenericCsvInputFormat(Path filePath, Charset charset) {
+		this(filePath);
+		this.charset = Preconditions.checkNotNull(charset);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	public int getNumberOfFieldsTotal() {
@@ -121,32 +124,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 		return commentPrefix;
 	}
 
-	public void setCommentPrefix(byte[] commentPrefix) {
-		this.commentPrefix = commentPrefix;
-	}
-
-	public void setCommentPrefix(char commentPrefix) {
-		setCommentPrefix(String.valueOf(commentPrefix));
-	}
-
 	public void setCommentPrefix(String commentPrefix) {
-		setCommentPrefix(commentPrefix, UTF_8_CHARSET);
+		setCommentPrefix(commentPrefix, charset);
 	}
 
-	public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
-		if (charsetName == null) {
-			throw new IllegalArgumentException("Charset name must not be null");
-		}
-
-		if (commentPrefix != null) {
-			Charset charset = Charset.forName(charsetName);
-			setCommentPrefix(commentPrefix, charset);
-		} else {
-			this.commentPrefix = null;
-		}
-	}
-
-	public void setCommentPrefix(String commentPrefix, Charset charset) {
+	private void setCommentPrefix(String commentPrefix, Charset charset) {
 		if (charset == null) {
 			throw new IllegalArgumentException("Charset must not be null");
 		}
@@ -174,7 +156,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	}
 
 	public void setFieldDelimiter(String delimiter) {
-		this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
+		this.fieldDelim = delimiter.getBytes(charset);
 	}
 
 	public boolean isLenient() {
@@ -314,6 +296,25 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 		this.fieldIncluded = includedMask;
 	}
 
+	/**
+	 * Gets the character set for the parser. Default is set to UTF-8.
+	 *
+	 * @return The charset for the parser.
+	 */
+	Charset getCharset() {
+		return this.charset;
+	}
+
+	/**
+	 * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
+	 * when doing a parse.
+	 *
+	 * @param charset The character set to set.
+	 */
+	public void setCharset(Charset charset) {
+		this.charset = Preconditions.checkNotNull(charset);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Runtime methods
 	// --------------------------------------------------------------------------------------------
@@ -334,6 +335,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 
 				FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
 
+				p.setCharset(this.getCharset());
 				if (this.quotedStringParsing) {
 					if (p instanceof StringParser) {
 						((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);
@@ -449,7 +451,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 			// search for ending quote character, continue when it is escaped
 			i++;
 
-			while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){
+			while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)) {
 				i++;
 			}
 			i++;

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 200d239..d9eeecc 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -19,11 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.types.BooleanValue;
 import org.apache.flink.types.ByteValue;
@@ -34,6 +29,12 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.ShortValue;
 import org.apache.flink.types.StringValue;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * A FieldParser is used parse a field from a sequence of bytes. Fields occur in a byte sequence and are terminated
  * by the end of the byte sequence or a delimiter.
@@ -77,9 +78,11 @@ public abstract class FieldParser<T> {
 		/** Invalid Boolean value **/
 		BOOLEAN_INVALID
 	}
-	
+
+	private Charset charset = Charset.forName("UTF-8");
+
 	private ParseErrorState errorState = ParseErrorState.NONE;
-	
+
 	/**
 	 * Parses the value of a field from the byte array, taking care of properly reset
 	 * the state of this parser.
@@ -217,7 +220,26 @@ public abstract class FieldParser<T> {
 
 		return limitedLength;
 	}
-	
+
+	/*
+	 * Gets the Charset for the parser.Default is set to ASCII
+	 *
+	 * @return The charset for the parser.
+	 */
+	public Charset getCharset() {
+		return this.charset;
+	}
+
+	/**
+	 * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
+	 * when doing a parse.
+	 *
+	 * @param charset The charset  to set.
+	 */
+	public void setCharset(Charset charset) {
+		this.charset = charset;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Mapping from types to parsers
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 1a2c7e3..7b46a7e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -63,11 +63,11 @@ public class StringParser extends FieldParser<String> {
 				// check for proper termination
 				if (i == limit) {
 					// either by end of line
-					this.result = new String(bytes, startPos + 1, i - startPos - 2);
+					this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset());
 					return limit;
 				} else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) {
 					// or following field delimiter
-					this.result = new String(bytes, startPos + 1, i - startPos - 2);
+					this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset());
 					return i + delimiter.length;
 				} else {
 					// no proper termination
@@ -87,14 +87,14 @@ public class StringParser extends FieldParser<String> {
 				if (limit == startPos) {
 					setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
 				}
-				this.result = new String(bytes, startPos, limit - startPos);
+				this.result = new String(bytes, startPos, limit - startPos, getCharset());
 				return limit;
 			} else {
 				// delimiter found.
 				if (i == startPos) {
 					setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
 				}
-				this.result = new String(bytes, startPos, i - startPos);
+				this.result = new String(bytes, startPos, i - startPos, getCharset());
 				return i + delimiter.length;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index e3215c6..d063ddc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -28,6 +28,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.GZIPOutputStream;
@@ -485,7 +486,7 @@ public class GenericCsvInputFormatTest {
 				fail("Input format accepted on invalid input.");
 			}
 			catch (ParseException e) {
-				; // all good
+				// all good
 			}
 		}
 		catch (Exception ex) {
@@ -547,7 +548,38 @@ public class GenericCsvInputFormatTest {
 			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
 		}
 	}
-	
+
+	@Test
+	public void testReadWithCharset() throws IOException {
+		try {
+			final String fileContent = "\u00bf|Flink|\u00f1";
+			final FileInputSplit split = createTempFile(fileContent);
+
+			final Configuration parameters = new Configuration();
+
+			format.setCharset(Charset.forName("UTF-8"));
+			format.setFieldDelimiter("|");
+			format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class);
+
+			format.configure(parameters);
+			format.open(split);
+
+			Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()};
+
+			values = format.nextRecord(values);
+			assertNotNull(values);
+			assertEquals("\u00bf", ((StringValue) values[0]).getValue());
+			assertEquals("Flink", ((StringValue) values[1]).getValue());
+			assertEquals("\u00f1", ((StringValue) values[2]).getValue());
+
+			assertNull(format.nextRecord(values));
+			assertTrue(format.reachedEnd());
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
+		}
+	}
+
 	@Test
 	public void readWithEmptyField() {
 		try {
@@ -722,7 +754,7 @@ public class GenericCsvInputFormatTest {
 		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
 	}
 	
-	private final Value[] createIntValues(int num) {
+	private Value[] createIntValues(int num) {
 		Value[] v = new Value[num];
 		
 		for (int i = 0; i < num; i++) {
@@ -732,7 +764,7 @@ public class GenericCsvInputFormatTest {
 		return v;
 	}
 	
-	private final Value[] createLongValues(int num) {
+	private Value[] createLongValues(int num) {
 		Value[] v = new Value[num];
 		
 		for (int i = 0; i < num; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 1fe8850..1c5579e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Test;
 
+import java.nio.charset.Charset;
+
 public class VarLengthStringParserTest {
 
 	public StringValueParser parser = new StringValueParser();
@@ -194,4 +196,22 @@ public class VarLengthStringParserTest {
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
 		assertTrue(startPos < 0);
 	}
+
+	@Test
+	public void testParseValidMixedStringsWithCharset() {
+
+		Charset charset = Charset.forName("US-ASCII");
+		this.parser = new StringValueParser();
+		this.parser.enableQuotedStringParsing((byte) '@');
+
+		// check valid strings with out whitespaces and trailing delimiter
+		byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+		StringValue s = new StringValue();
+
+		int startPos = 0;
+		parser.setCharset(charset);
+		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s);
+		assertTrue(startPos == 11);
+		assertTrue(s.getValue().equals("abcde|gh"));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 8be5dc2..b13b8aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 
@@ -64,6 +65,8 @@ public class CsvReader {
 	protected boolean skipFirstLineAsHeader = false;
 	
 	protected boolean ignoreInvalidLines = false;
+
+	private Charset charset = Charset.forName("UTF-8");
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -157,7 +160,25 @@ public class CsvReader {
 		this.commentPrefix = commentPrefix;
 		return this;
 	}
-	
+
+	/**
+	 * Gets the character set for the reader. Default is set to UTF-8.
+	 *
+	 * @return The charset for the reader.
+	 */
+	public Charset getCharset() {
+		return this.charset;
+	}
+
+	/**
+	 * Sets the charset of the reader
+	 *
+	 * @param charset The character set to set.
+	 */
+	public void setCharset(Charset charset) {
+		this.charset = Preconditions.checkNotNull(charset);
+	}
+
 	/**
 	 * Configures which fields of the CSV file should be included and which should be skipped. The
 	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
@@ -340,6 +361,7 @@ public class CsvReader {
 		format.setCommentPrefix(this.commentPrefix);
 		format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
 		format.setLenient(ignoreInvalidLines);
+		format.setCharset(this.charset);
 		if (this.parseQuotedStrings) {
 			format.enableQuotedStringParsing(this.quoteCharacter);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index 8b12315..e1c8023 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -75,6 +76,14 @@ public class CSVReaderTest {
 		reader.ignoreComments("#");
 		assertEquals("#", reader.commentPrefix);
 	}
+
+	@Test
+	public void testCharset() {
+		CsvReader reader = getCsvReader();
+		assertEquals(reader.getCharset(), Charset.forName("UTF-8"));
+		reader.setCharset(Charset.forName("US-ASCII"));
+		assertEquals(reader.getCharset(), Charset.forName("US-ASCII"));
+	}
 	
 	@Test
 	public void testIncludeFieldsDense() {


[4/4] flink git commit: [FLINK-3921] Add support to set encoding in CsvReader and StringParser.

Posted by fh...@apache.org.
[FLINK-3921] Add support to set encoding in CsvReader and StringParser.

- extends first commit.

This closes #2901.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41d5875b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41d5875b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41d5875b

Branch: refs/heads/master
Commit: 41d5875bfc272f2cd5c7e8c8523036684865c1ce
Parents: f2186af
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Nov 28 12:43:47 2016 -0500
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 21:21:48 2016 +0100

----------------------------------------------------------------------
 .../api/common/io/DelimitedInputFormat.java     |  67 ++++++++--
 .../api/common/io/GenericCsvInputFormat.java    |  79 +++++-------
 .../apache/flink/types/parser/FieldParser.java  |  18 ++-
 .../api/common/io/DelimitedInputFormatTest.java |  94 ++++++++++----
 .../common/io/GenericCsvInputFormatTest.java    | 125 ++++++++++---------
 .../types/parser/VarLengthStringParserTest.java |  12 +-
 .../org/apache/flink/api/java/io/CsvReader.java |  24 ++--
 .../apache/flink/api/java/io/CSVReaderTest.java |  23 ++--
 .../flink/api/java/io/CsvInputFormatTest.java   |   6 +-
 .../runtime/io/RowCsvInputFormatTest.scala      |   6 +-
 .../flink/api/scala/io/CsvInputFormatTest.scala |  14 +--
 11 files changed, 273 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index fd02c82..5c8dfc1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -20,17 +20,18 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -56,8 +57,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 	 */
 	private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class);
 
-	/** The default charset  to convert strings to bytes */
-	private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+	// The charset used to convert strings to bytes
+	private String charsetName = "UTF-8";
+
+	// Charset is not serializable
+	private transient Charset charset;
 
 	/**
 	 * The default read buffer size = 1MB.
@@ -157,9 +161,12 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 	// --------------------------------------------------------------------------------------------
 	//  The configuration parameters. Configured on the instance and serialized to be shipped.
 	// --------------------------------------------------------------------------------------------
-	
+
+	// The delimiter may be set with a byte-sequence or a String. In the latter
+	// case the byte representation is updated consistent with current charset.
 	private byte[] delimiter = new byte[] {'\n'};
-	
+	private String delimiterString = null;
+
 	private int lineLengthLimit = Integer.MAX_VALUE;
 	
 	private int bufferSize = -1;
@@ -182,8 +189,42 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 		}
 		loadConfigParameters(configuration);
 	}
-	
-	
+
+	/**
+	 * Get the character set used for the row delimiter. This is also used by
+	 * subclasses to interpret field delimiters, comment strings, and for
+	 * configuring {@link FieldParser}s.
+	 *
+	 * @return the charset
+	 */
+	@PublicEvolving
+	public Charset getCharset() {
+		if (this.charset == null) {
+			this.charset = Charset.forName(charsetName);
+		}
+		return this.charset;
+	}
+
+	/**
+	 * Set the name of the character set used for the row delimiter. This is
+	 * also used by subclasses to interpret field delimiters, comment strings,
+	 * and for configuring {@link FieldParser}s.
+	 *
+	 * These fields are interpreted when set. Changing the charset thereafter
+	 * may cause unexpected results.
+	 *
+	 * @param charset name of the charset
+	 */
+	@PublicEvolving
+	public void setCharset(String charset) {
+		this.charsetName = Preconditions.checkNotNull(charset);
+		this.charset = null;
+
+		if (this.delimiterString != null) {
+			this.delimiter = delimiterString.getBytes(getCharset());
+		}
+	}
+
 	public byte[] getDelimiter() {
 		return delimiter;
 	}
@@ -193,6 +234,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 			throw new IllegalArgumentException("Delimiter must not be null");
 		}
 		this.delimiter = delimiter;
+		this.delimiterString = null;
 	}
 
 	public void setDelimiter(char delimiter) {
@@ -203,7 +245,8 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 		if (delimiter == null) {
 			throw new IllegalArgumentException("Delimiter must not be null");
 		}
-		this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
+		this.delimiter = delimiter.getBytes(getCharset());
+		this.delimiterString = delimiter;
 	}
 	
 	public int getLineLengthLimit() {
@@ -264,7 +307,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Configures this input format by reading the path to the file from the configuration andge the string that
+	 * Configures this input format by reading the path to the file from the configuration and the string that
 	 * defines the record delimiter.
 	 * 
 	 * @param parameters The configuration object to read the parameters from.

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 0ced22b..20c643e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -25,12 +25,10 @@ import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
 import org.apache.flink.types.parser.StringValueParser;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.TreeMap;
@@ -46,9 +44,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	
 	private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
 
-	/** The charset used to convert strings to bytes */
-	private Charset charset = Charset.forName("UTF-8");
-
 	private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
 	
 	private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -79,9 +74,12 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	private Class<?>[] fieldTypes = EMPTY_TYPES;
 	
 	protected boolean[] fieldIncluded = EMPTY_INCLUDED;
-		
+
+	// The byte representation of the delimiter is updated consistent with
+	// current charset.
 	private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;
-	
+	private String fieldDelimString = null;
+
 	private boolean lenient;
 	
 	private boolean skipFirstLineAsHeader;
@@ -90,7 +88,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 
 	private byte quoteCharacter;
 
+	// The byte representation of the comment prefix is updated consistent with
+	// current charset.
 	protected byte[] commentPrefix = null;
+	private String commentPrefixString = null;
 
 
 	// --------------------------------------------------------------------------------------------
@@ -105,11 +106,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 		super(filePath, null);
 	}
 
-	protected GenericCsvInputFormat(Path filePath, Charset charset) {
-		this(filePath);
-		this.charset = Preconditions.checkNotNull(charset);
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	public int getNumberOfFieldsTotal() {
@@ -120,43 +116,43 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 		return this.fieldTypes.length;
 	}
 
+	@Override
+	public void setCharset(String charset) {
+		super.setCharset(charset);
+
+		if (this.fieldDelimString != null) {
+			this.fieldDelim = fieldDelimString.getBytes(getCharset());
+		}
+
+		if (this.commentPrefixString != null) {
+			this.commentPrefix = commentPrefixString.getBytes(getCharset());
+		}
+	}
+
 	public byte[] getCommentPrefix() {
 		return commentPrefix;
 	}
 
 	public void setCommentPrefix(String commentPrefix) {
-		setCommentPrefix(commentPrefix, charset);
-	}
-
-	private void setCommentPrefix(String commentPrefix, Charset charset) {
-		if (charset == null) {
-			throw new IllegalArgumentException("Charset must not be null");
-		}
 		if (commentPrefix != null) {
-			this.commentPrefix = commentPrefix.getBytes(charset);
+			this.commentPrefix = commentPrefix.getBytes(getCharset());
 		} else {
 			this.commentPrefix = null;
 		}
+		this.commentPrefixString = commentPrefix;
 	}
 
 	public byte[] getFieldDelimiter() {
 		return fieldDelim;
 	}
 
-	public void setFieldDelimiter(byte[] delimiter) {
+	public void setFieldDelimiter(String delimiter) {
 		if (delimiter == null) {
 			throw new IllegalArgumentException("Delimiter must not be null");
 		}
 
-		this.fieldDelim = delimiter;
-	}
-
-	public void setFieldDelimiter(char delimiter) {
-		setFieldDelimiter(String.valueOf(delimiter));
-	}
-
-	public void setFieldDelimiter(String delimiter) {
-		this.fieldDelim = delimiter.getBytes(charset);
+		this.fieldDelim = delimiter.getBytes(getCharset());
+		this.fieldDelimString = delimiter;
 	}
 
 	public boolean isLenient() {
@@ -296,25 +292,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 		this.fieldIncluded = includedMask;
 	}
 
-	/**
-	 * Gets the character set for the parser. Default is set to UTF-8.
-	 *
-	 * @return The charset for the parser.
-	 */
-	Charset getCharset() {
-		return this.charset;
-	}
-
-	/**
-	 * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
-	 * when doing a parse.
-	 *
-	 * @param charset The character set to set.
-	 */
-	public void setCharset(Charset charset) {
-		this.charset = Preconditions.checkNotNull(charset);
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Runtime methods
 	// --------------------------------------------------------------------------------------------
@@ -322,7 +299,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	@Override
 	public void open(FileInputSplit split) throws IOException {
 		super.open(split);
-		
+
 		// instantiate the parsers
 		FieldParser<?>[] parsers = new FieldParser<?>[fieldTypes.length];
 		
@@ -335,7 +312,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 
 				FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
 
-				p.setCharset(this.getCharset());
+				p.setCharset(getCharset());
 				if (this.quotedStringParsing) {
 					if (p instanceof StringParser) {
 						((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index d9eeecc..cf3c83d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -32,6 +32,7 @@ import org.apache.flink.types.StringValue;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -79,7 +80,7 @@ public abstract class FieldParser<T> {
 		BOOLEAN_INVALID
 	}
 
-	private Charset charset = Charset.forName("UTF-8");
+	private Charset charset = StandardCharsets.UTF_8;
 
 	private ParseErrorState errorState = ParseErrorState.NONE;
 
@@ -105,9 +106,7 @@ public abstract class FieldParser<T> {
 
 	/**
 	 * Each parser's logic should be implemented inside this method
-	 *
-	 * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)}
-	 * */
+	 */
 	protected abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse);
 
 	/**
@@ -221,20 +220,19 @@ public abstract class FieldParser<T> {
 		return limitedLength;
 	}
 
-	/*
-	 * Gets the Charset for the parser.Default is set to ASCII
+	/**
+	 * Gets the character set used for this parser.
 	 *
-	 * @return The charset for the parser.
+	 * @return the charset used for this parser.
 	 */
 	public Charset getCharset() {
 		return this.charset;
 	}
 
 	/**
-	 * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
-	 * when doing a parse.
+	 * Sets the character set used for this parser.
 	 *
-	 * @param charset The charset  to set.
+	 * @param charset charset used for this parser.
 	 */
 	public void setCharset(Charset charset) {
 		this.charset = charset;

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 93d5f9f..219365a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.api.common.io;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -33,17 +34,17 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class DelimitedInputFormatTest {
 	
@@ -200,6 +201,45 @@ public class DelimitedInputFormatTest {
 		}
 	}
 
+	@Test
+	public void testReadCustomDelimiterWithCharset() throws IOException {
+		// Unicode row fragments
+		String[] records = new String[]{"\u020e\u021f\u05c0\u020b\u020f", "Apache", "\nFlink", "\u0000", "\u05c0"};
+
+		// Unicode delimiter
+		String delimiter = "\u05c0\u05c0";
+
+		String fileContent = StringUtils.join(records, delimiter);
+
+		for (final String charset : new String[]{ "UTF-8", "UTF-16BE", "UTF-16LE" }) {
+			// use charset when instantiating the record String
+			DelimitedInputFormat<String> format = new DelimitedInputFormat<String>() {
+				@Override
+				public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+					return new String(bytes, offset, numBytes, charset);
+				}
+			};
+			format.setFilePath("file:///some/file/that/will/not/be/read");
+
+			final FileInputSplit split = createTempFile(fileContent, charset);
+
+			format.setDelimiter(delimiter);
+			// use the same encoding to parse the file as used to read the file;
+			// the delimiter is reinterpreted when the charset is set
+			format.setCharset(charset);
+			format.configure(new Configuration());
+			format.open(split);
+
+			for (String record : records) {
+				String value = format.nextRecord(null);
+				assertEquals(record, value);
+			}
+
+			assertNull(format.nextRecord(null));
+			assertTrue(format.reachedEnd());
+		}
+	}
+
 	/**
 	 * Tests that the records are read correctly when the split boundary is in the middle of a record.
 	 */
@@ -363,19 +403,29 @@ public class DelimitedInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
-	
-	private static FileInputSplit createTempFile(String contents) throws IOException {
+
+	static FileInputSplit createTempFile(String contents) throws IOException {
 		File tempFile = File.createTempFile("test_contents", "tmp");
 		tempFile.deleteOnExit();
-		
-		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
-		wrt.write(contents);
-		wrt.close();
-		
+
+		try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile))) {
+			out.write(contents);
+		}
+
 		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
 	}
-	
-	
+
+	static FileInputSplit createTempFile(String contents, String charset) throws IOException {
+		File tempFile = File.createTempFile("test_contents", "tmp");
+		tempFile.deleteOnExit();
+
+		try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile), charset)) {
+			out.write(contents);
+		}
+
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
+	}
+
 	protected static final class MyTextInputFormat extends DelimitedInputFormat<String> {
 		private static final long serialVersionUID = 1L;
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index d063ddc..c11a573 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -18,21 +18,7 @@
 
 package org.apache.flink.api.common.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.GZIPOutputStream;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -41,15 +27,29 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static org.apache.flink.api.common.io.DelimitedInputFormatTest.createTempFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class GenericCsvInputFormatTest {
 
-	private File tempFile;
-	
 	private TestCsvInputFormat format;
 	
 	// --------------------------------------------------------------------------------------------
@@ -65,9 +65,6 @@ public class GenericCsvInputFormatTest {
 		if (this.format != null) {
 			this.format.close();
 		}
-		if (this.tempFile != null) {
-			this.tempFile.delete();
-		}
 	}
 	
 	@Test
@@ -87,7 +84,7 @@ public class GenericCsvInputFormatTest {
 	public void testReadNoPosAll() throws IOException {
 		try {
 			final String fileContent = "111|222|333|444|555\n666|777|888|999|000|";
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 		
 			final Configuration parameters = new Configuration();
 			
@@ -485,8 +482,7 @@ public class GenericCsvInputFormatTest {
 				format.nextRecord(values);
 				fail("Input format accepted on invalid input.");
 			}
-			catch (ParseException e) {
-				// all good
+			catch (ParseException ignored) {
 			}
 		}
 		catch (Exception ex) {
@@ -551,40 +547,64 @@ public class GenericCsvInputFormatTest {
 
 	@Test
 	public void testReadWithCharset() throws IOException {
-		try {
-			final String fileContent = "\u00bf|Flink|\u00f1";
-			final FileInputSplit split = createTempFile(fileContent);
+		// Unicode row fragments
+		String[] records = new String[]{"\u020e\u021f", "Flink", "\u020b\u020f"};
 
-			final Configuration parameters = new Configuration();
+		// Unicode delimiter
+		String delimiter = "\u05c0\u05c0";
 
-			format.setCharset(Charset.forName("UTF-8"));
-			format.setFieldDelimiter("|");
-			format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class);
+		String fileContent = StringUtils.join(records, delimiter);
 
-			format.configure(parameters);
-			format.open(split);
+		// StringValueParser does not use charset so rely on StringParser
+		GenericCsvInputFormat<String[]> format = new GenericCsvInputFormat<String[]>() {
+			@Override
+			public String[] readRecord(String[] target, byte[] bytes, int offset, int numBytes) throws IOException {
+				return parseRecord(target, bytes, offset, numBytes) ? target : null;
+			}
+		};
+		format.setFilePath("file:///some/file/that/will/not/be/read");
 
-			Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()};
+		for (String charset : new String[]{ "UTF-8", "UTF-16BE", "UTF-16LE" }) {
+			File tempFile = File.createTempFile("test_contents", "tmp");
+			tempFile.deleteOnExit();
 
+			// write string with proper encoding
+			try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile), charset)) {
+				out.write(fileContent);
+			}
+
+			FileInputSplit split = new FileInputSplit(0, new Path(tempFile.toURI().toString()),
+				0, tempFile.length(), new String[]{ "localhost" });
+
+			format.setFieldDelimiter(delimiter);
+			format.setFieldTypesGeneric(String.class, String.class, String.class);
+			// use the same encoding to parse the file as used to read the file;
+			// the field delimiter is reinterpreted when the charset is set
+			format.setCharset(charset);
+			format.configure(new Configuration());
+			format.open(split);
+
+			String[] values = new String[]{ "", "", "" };
 			values = format.nextRecord(values);
+
+			// validate results
 			assertNotNull(values);
-			assertEquals("\u00bf", ((StringValue) values[0]).getValue());
-			assertEquals("Flink", ((StringValue) values[1]).getValue());
-			assertEquals("\u00f1", ((StringValue) values[2]).getValue());
+			for (int i = 0 ; i < records.length ; i++) {
+				assertEquals(records[i], values[i]);
+			}
 
 			assertNull(format.nextRecord(values));
 			assertTrue(format.reachedEnd());
 		}
-		catch (Exception ex) {
-			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
+
+		format.close();
 	}
 
 	@Test
 	public void readWithEmptyField() {
 		try {
 			final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
-			final FileInputSplit split = createTempFile(fileContent);	
+			final FileInputSplit split = createTempFile(fileContent);
 		
 			final Configuration parameters = new Configuration();
 
@@ -721,37 +741,26 @@ public class GenericCsvInputFormatTest {
 		}
 	}
 
-	private FileInputSplit createTempFile(String content) throws IOException {
-		this.tempFile = File.createTempFile("test_contents", "tmp");
-		this.tempFile.deleteOnExit();
-		
-		DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile));
-		dos.writeBytes(content);
-		dos.close();
-			
-		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
-	}
-
 	private FileInputSplit createTempDeflateFile(String content) throws IOException {
-		this.tempFile = File.createTempFile("test_contents", "tmp.deflate");
-		this.tempFile.deleteOnExit();
+		File tempFile = File.createTempFile("test_contents", "tmp.deflate");
+		tempFile.deleteOnExit();
 
 		DataOutputStream dos = new DataOutputStream(new DeflaterOutputStream(new FileOutputStream(tempFile)));
 		dos.writeBytes(content);
 		dos.close();
 
-		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
 	}
 
 	private FileInputSplit createTempGzipFile(String content) throws IOException {
-		this.tempFile = File.createTempFile("test_contents", "tmp.gz");
-		this.tempFile.deleteOnExit();
+		File tempFile = File.createTempFile("test_contents", "tmp.gz");
+		tempFile.deleteOnExit();
 
 		DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(tempFile)));
 		dos.writeBytes(content);
 		dos.close();
 
-		return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
 	}
 	
 	private Value[] createIntValues(int num) {

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 1c5579e..718274e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -19,13 +19,15 @@
 
 package org.apache.flink.types.parser;
 
-import static org.junit.Assert.assertTrue;
-
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Test;
 
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class VarLengthStringParserTest {
 
@@ -200,7 +202,7 @@ public class VarLengthStringParserTest {
 	@Test
 	public void testParseValidMixedStringsWithCharset() {
 
-		Charset charset = Charset.forName("US-ASCII");
+		Charset charset = StandardCharsets.US_ASCII;
 		this.parser = new StringValueParser();
 		this.parser.enableQuotedStringParsing((byte) '@');
 
@@ -211,7 +213,7 @@ public class VarLengthStringParserTest {
 		int startPos = 0;
 		parser.setCharset(charset);
 		startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s);
-		assertTrue(startPos == 11);
-		assertTrue(s.getValue().equals("abcde|gh"));
+		assertEquals(11, startPos);
+		assertEquals("abcde|gh", s.getValue());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index b13b8aa..cbac386 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -18,24 +18,22 @@
 
 package org.apache.flink.api.java.io;
 
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
+//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+import org.apache.flink.api.java.tuple.*;
+//CHECKSTYLE.ON: AvoidStarImport
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Preconditions;
 
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
-import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+import java.util.ArrayList;
+import java.util.Arrays;
 
 /**
  * A builder class to instantiate a CSV parsing data source. The CSV reader configures the field types,
@@ -66,7 +64,7 @@ public class CsvReader {
 	
 	protected boolean ignoreInvalidLines = false;
 
-	private Charset charset = Charset.forName("UTF-8");
+	private String charset = "UTF-8";
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -162,11 +160,12 @@ public class CsvReader {
 	}
 
 	/**
-	 * Gets the character set for the reader. Default is set to UTF-8.
+	 * Gets the character set for the reader. Default is UTF-8.
 	 *
 	 * @return The charset for the reader.
 	 */
-	public Charset getCharset() {
+	@PublicEvolving
+	public String getCharset() {
 		return this.charset;
 	}
 
@@ -175,7 +174,8 @@ public class CsvReader {
 	 *
 	 * @param charset The character set to set.
 	 */
-	public void setCharset(Charset charset) {
+	@PublicEvolving
+	public void setCharset(String charset) {
 		this.charset = Preconditions.checkNotNull(charset);
 	}
 
@@ -356,12 +356,12 @@ public class CsvReader {
 	// --------------------------------------------------------------------------------------------
 	
 	private void configureInputFormat(CsvInputFormat<?> format) {
+		format.setCharset(this.charset);
 		format.setDelimiter(this.lineDelimiter);
 		format.setFieldDelimiter(this.fieldDelimiter);
 		format.setCommentPrefix(this.commentPrefix);
 		format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
 		format.setLenient(ignoreInvalidLines);
-		format.setCharset(this.charset);
 		if (this.parseQuotedStrings) {
 			format.enableQuotedStringParsing(this.quoteCharacter);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index e1c8023..de57e5c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -18,15 +18,9 @@
 
 package org.apache.flink.api.java.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -47,7 +41,12 @@ import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
-import org.apache.flink.api.java.ExecutionEnvironment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 /**
  * Tests for the CSV reader builder.
@@ -80,11 +79,11 @@ public class CSVReaderTest {
 	@Test
 	public void testCharset() {
 		CsvReader reader = getCsvReader();
-		assertEquals(reader.getCharset(), Charset.forName("UTF-8"));
-		reader.setCharset(Charset.forName("US-ASCII"));
-		assertEquals(reader.getCharset(), Charset.forName("US-ASCII"));
+		assertEquals("UTF-8", reader.getCharset());
+		reader.setCharset("US-ASCII");
+		assertEquals("US-ASCII", reader.getCharset());
 	}
-	
+
 	@Test
 	public void testIncludeFieldsDense() {
 		CsvReader reader = getCsvReader();

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 54f226c..cc0d5bc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -771,7 +771,7 @@ public class CsvInputFormatTest {
 		final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format = new TupleCsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH, typeInfo);
 
 		format.setSkipFirstLineAsHeader(true);
-		format.setFieldDelimiter(',');
+		format.setFieldDelimiter(",");
 
 		format.configure(new Configuration());
 		format.open(split);
@@ -1077,7 +1077,7 @@ public class CsvInputFormatTest {
 		CsvInputFormat<Tuple2<String, String>> inputFormat = new TupleCsvInputFormat<Tuple2<String, String>>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true});
 
 		inputFormat.enableQuotedStringParsing('"');
-		inputFormat.setFieldDelimiter('|');
+		inputFormat.setFieldDelimiter("|");
 		inputFormat.setDelimiter('\n');
 
 		inputFormat.configure(new Configuration());
@@ -1107,7 +1107,7 @@ public class CsvInputFormatTest {
 		CsvInputFormat<Tuple2<String, String>> inputFormat = new TupleCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
 
 		inputFormat.enableQuotedStringParsing('"');
-		inputFormat.setFieldDelimiter('|');
+		inputFormat.setFieldDelimiter("|");
 		inputFormat.setDelimiter('\n');
 
 		inputFormat.configure(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
index d176b79..d72e7a8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -656,7 +656,7 @@ class RowCsvInputFormatTest {
 
     val format = new RowCsvInputFormat(PATH, typeInfo)
     format.setSkipFirstLineAsHeader(true)
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(split)
 
@@ -745,7 +745,7 @@ class RowCsvInputFormatTest {
       rowTypeInfo = typeInfo,
       includedFieldsMask = Array(true, false, true))
     inputFormat.enableQuotedStringParsing('"')
-    inputFormat.setFieldDelimiter('|')
+    inputFormat.setFieldDelimiter("|")
     inputFormat.setDelimiter('\n')
     inputFormat.configure(new Configuration)
 
@@ -776,7 +776,7 @@ class RowCsvInputFormatTest {
       new Path(tempFile.toURI.toString),
       rowTypeInfo = typeInfo)
     inputFormat.enableQuotedStringParsing('"')
-    inputFormat.setFieldDelimiter('|')
+    inputFormat.setFieldDelimiter("|")
     inputFormat.setDelimiter('\n')
     inputFormat.configure(new Configuration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 24d86e7..539a257 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -56,7 +56,7 @@ class CsvInputFormatTest {
         createTypeInformation[(String, Integer, Double)]
           .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
       format.setDelimiter("\n")
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|")
       format.setCommentPrefix("#")
       val parameters = new Configuration
       format.configure(parameters)
@@ -98,7 +98,7 @@ class CsvInputFormatTest {
         createTypeInformation[(String, Integer, Double)]
           .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
       format.setDelimiter("\n")
-      format.setFieldDelimiter('|')
+      format.setFieldDelimiter("|")
       format.setCommentPrefix("//")
       val parameters = new Configuration
       format.configure(parameters)
@@ -443,7 +443,7 @@ class CsvInputFormatTest {
     val format = new PojoCsvInputFormat[POJOItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 
@@ -460,7 +460,7 @@ class CsvInputFormatTest {
     val format = new TupleCsvInputFormat[CaseClassItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 
@@ -477,7 +477,7 @@ class CsvInputFormatTest {
       PATH, typeInfo, Array("field2", "field1", "field3"))
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 
@@ -495,7 +495,7 @@ class CsvInputFormatTest {
       Array(true, true, false, true, false))
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 
@@ -511,7 +511,7 @@ class CsvInputFormatTest {
     val format = new PojoCsvInputFormat[TwitterPOJO](PATH, typeInfo)
 
     format.setDelimiter('\n')
-    format.setFieldDelimiter(',')
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(tempFile)
 


[2/4] flink git commit: [FLINK-5039] Bump Avro version to 1.7.7.

Posted by fh...@apache.org.
[FLINK-5039] Bump Avro version to 1.7.7.

This closes #2953.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d8f03e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d8f03e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d8f03e7

Branch: refs/heads/master
Commit: 2d8f03e7ad12af3a0dcb7bec087c25f19a4fd03e
Parents: 677d0d9
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Dec 6 21:03:10 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 18:45:55 2016 +0100

----------------------------------------------------------------------
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d8f03e7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d3ddf92..04ba726 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,13 +220,13 @@ under the License.
 			<dependency>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro</artifactId>
-				<version>1.7.6</version>
+				<version>1.7.7</version>
 			</dependency>
 			
 			<dependency>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro-ipc</artifactId>
-				<version>1.7.6</version>
+				<version>1.7.7</version>
 			</dependency>
 
 			<!-- Make sure we use a consistent commons-cli version throughout the project -->