You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/08 20:23:23 UTC
[1/4] flink git commit: [FLINK-5226] [table] Use correct
DataSetCostFactory and improve DataSetCalc costs.
Repository: flink
Updated Branches:
refs/heads/master 55d60615a -> 41d5875bf
[FLINK-5226] [table] Use correct DataSetCostFactory and improve DataSetCalc costs.
- Improved DataSetCalc costs make projections cheap and help to push them down.
This closes #2926.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677d0d90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677d0d90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677d0d90
Branch: refs/heads/master
Commit: 677d0d9073952b6f4c745ac242ba4108364f2189
Parents: 55d6061
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Dec 2 15:28:16 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 18:45:42 2016 +0100
----------------------------------------------------------------------
.../flink/api/table/FlinkRelBuilder.scala | 2 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 14 ++++++--
.../table/plan/nodes/dataset/DataSetJoin.scala | 17 ++++++----
.../api/scala/batch/sql/SetOperatorsTest.scala | 14 ++++++--
.../api/scala/batch/sql/SingleRowJoinTest.scala | 34 ++++++++++++--------
5 files changed, 55 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index da44ebb..8508e53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -79,7 +79,7 @@ object FlinkRelBuilder {
val typeFactory = new FlinkTypeFactory(typeSystem)
// create context instances with Flink type factory
- val planner = new VolcanoPlanner(Contexts.empty())
+ val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty())
planner.setExecutor(config.getExecutor)
planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index b8b74ad..c0881b7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -32,6 +32,8 @@ import TypeConverter._
import org.apache.flink.api.table.BatchTableEnvironment
import org.apache.calcite.rex._
+import scala.collection.JavaConverters._
+
/**
* Flink RelNode which matches along with LogicalCalc.
*
@@ -73,8 +75,16 @@ class DataSetCalc(
val child = this.getInput
val rowCnt = metadata.getRowCount(child)
- val exprCnt = calcProgram.getExprCount
- planner.getCostFactory.makeCost(rowCnt, rowCnt * exprCnt, 0)
+
+ // compute number of expressions that do not access a field or literal, i.e. computations,
+ // conditions, etc. We only want to account for computations, not for simple projections.
+ val compCnt = calcProgram.getExprList.asScala.toList.count {
+ case i: RexInputRef => false
+ case l: RexLiteral => false
+ case _ => true
+ }
+
+ planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
}
override def estimateRowCount(metadata: RelMetadataQuery): Double = {
http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 6d7a30e..ccd84ca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -88,12 +88,17 @@ class DataSetJoin(
override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
- val children = this.getInputs
- children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
- val rowCnt = metadata.getRowCount(child)
- val rowSize = this.estimateRowSize(child.getRowType)
- cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
- }
+ val leftRowCnt = metadata.getRowCount(getLeft)
+ val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+ val rightRowCnt = metadata.getRowCount(getRight)
+ val rightRowSize = estimateRowSize(getRight.getRowType)
+
+ val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+ val cpuCost = leftRowCnt + rightRowCnt
+ val rowCnt = leftRowCnt + rightRowCnt
+
+ planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
}
override def translateToPlan(
http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
index 7b2b497..d0c0400 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperatorsTest.scala
@@ -43,15 +43,23 @@ class SetOperatorsTest extends TableTestBase {
"DataSetCalc",
binaryNode(
"DataSetJoin",
- batchTableNode(1),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "b_long")
+ ),
unaryNode(
"DataSetAggregate",
- batchTableNode(0),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a_long")
+ ),
term("groupBy", "a_long"),
term("select", "a_long")
),
term("where", "=(a_long, b_long)"),
- term("join", "b_long", "b_int", "b_string", "a_long"),
+ term("join", "b_long", "a_long"),
term("joinType", "InnerJoin")
),
term("select", "true AS $f0", "a_long")
http://git-wip-us.apache.org/repos/asf/flink/blob/677d0d90/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
index f56b9ae..49f61af 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SingleRowJoinTest.scala
@@ -48,20 +48,23 @@ class SingleRowJoinTest extends TableTestBase {
"DataSetUnion",
unaryNode(
"DataSetValues",
- batchTableNode(0),
- tuples(List(null, null)),
- term("values", "a1", "a2")
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)),
+ term("values", "a1")
),
- term("union","a1","a2")
+ term("union","a1")
),
term("select", "COUNT(a1) AS cnt")
),
- term("where", "true"),
+ term("where", "=(CAST(a1), cnt)"),
term("join", "a1", "a2", "cnt"),
term("joinType", "NestedLoopJoin")
),
- term("select", "a1", "a2"),
- term("where", "=(CAST(a1), cnt)")
+ term("select", "a1", "a2")
)
util.verifySql(query, expected)
@@ -89,20 +92,23 @@ class SingleRowJoinTest extends TableTestBase {
"DataSetUnion",
unaryNode(
"DataSetValues",
- batchTableNode(0),
- tuples(List(null, null)),
- term("values", "a1", "a2")
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a1")
+ ),
+ tuples(List(null)),
+ term("values", "a1")
),
- term("union","a1","a2")
+ term("union", "a1")
),
term("select", "COUNT(a1) AS cnt")
),
- term("where", "true"),
+ term("where", "<(a1, cnt)"),
term("join", "a1", "a2", "cnt"),
term("joinType", "NestedLoopJoin")
),
- term("select", "a1", "a2"),
- term("where", "<(a1, cnt)")
+ term("select", "a1", "a2")
)
util.verifySql(query, expected)
[3/4] flink git commit: [FLINK-3921] Add support to set encoding in
CsvReader and StringParser.
Posted by fh...@apache.org.
[FLINK-3921] Add support to set encoding in CsvReader and StringParser.
This closes #2060.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186af6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186af6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186af6
Branch: refs/heads/master
Commit: f2186af6702c9fe48c91d5c2d7748378984cd29b
Parents: 2d8f03e
Author: Joshi <re...@gmail.com>
Authored: Wed Jun 1 13:26:47 2016 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 18:47:36 2016 +0100
----------------------------------------------------------------------
.../api/common/io/GenericCsvInputFormat.java | 64 ++++++++++----------
.../apache/flink/types/parser/FieldParser.java | 38 +++++++++---
.../apache/flink/types/parser/StringParser.java | 8 +--
.../common/io/GenericCsvInputFormatTest.java | 40 ++++++++++--
.../types/parser/VarLengthStringParserTest.java | 20 ++++++
.../org/apache/flink/api/java/io/CsvReader.java | 24 +++++++-
.../apache/flink/api/java/io/CSVReaderTest.java | 9 +++
7 files changed, 155 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 85d9cd8..0ced22b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -25,14 +25,12 @@ import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
-
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
@@ -48,9 +46,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
- /** The default charset to convert strings to bytes */
- private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-
+ /** The charset used to convert strings to bytes */
+ private Charset charset = Charset.forName("UTF-8");
+
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -107,6 +105,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
super(filePath, null);
}
+ protected GenericCsvInputFormat(Path filePath, Charset charset) {
+ this(filePath);
+ this.charset = Preconditions.checkNotNull(charset);
+ }
+
// --------------------------------------------------------------------------------------------
public int getNumberOfFieldsTotal() {
@@ -121,32 +124,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
return commentPrefix;
}
- public void setCommentPrefix(byte[] commentPrefix) {
- this.commentPrefix = commentPrefix;
- }
-
- public void setCommentPrefix(char commentPrefix) {
- setCommentPrefix(String.valueOf(commentPrefix));
- }
-
public void setCommentPrefix(String commentPrefix) {
- setCommentPrefix(commentPrefix, UTF_8_CHARSET);
+ setCommentPrefix(commentPrefix, charset);
}
- public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
- if (charsetName == null) {
- throw new IllegalArgumentException("Charset name must not be null");
- }
-
- if (commentPrefix != null) {
- Charset charset = Charset.forName(charsetName);
- setCommentPrefix(commentPrefix, charset);
- } else {
- this.commentPrefix = null;
- }
- }
-
- public void setCommentPrefix(String commentPrefix, Charset charset) {
+ private void setCommentPrefix(String commentPrefix, Charset charset) {
if (charset == null) {
throw new IllegalArgumentException("Charset must not be null");
}
@@ -174,7 +156,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
}
public void setFieldDelimiter(String delimiter) {
- this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET);
+ this.fieldDelim = delimiter.getBytes(charset);
}
public boolean isLenient() {
@@ -314,6 +296,25 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
this.fieldIncluded = includedMask;
}
+ /**
+ * Gets the character set for the parser. Default is set to UTF-8.
+ *
+ * @return The charset for the parser.
+ */
+ Charset getCharset() {
+ return this.charset;
+ }
+
+ /**
+ * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
+ * when doing a parse.
+ *
+ * @param charset The character set to set.
+ */
+ public void setCharset(Charset charset) {
+ this.charset = Preconditions.checkNotNull(charset);
+ }
+
// --------------------------------------------------------------------------------------------
// Runtime methods
// --------------------------------------------------------------------------------------------
@@ -334,6 +335,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
+ p.setCharset(this.getCharset());
if (this.quotedStringParsing) {
if (p instanceof StringParser) {
((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);
@@ -449,7 +451,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
// search for ending quote character, continue when it is escaped
i++;
- while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){
+ while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)) {
i++;
}
i++;
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 200d239..d9eeecc 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -19,11 +19,6 @@
package org.apache.flink.types.parser;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.ByteValue;
@@ -34,6 +29,12 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* A FieldParser is used parse a field from a sequence of bytes. Fields occur in a byte sequence and are terminated
* by the end of the byte sequence or a delimiter.
@@ -77,9 +78,11 @@ public abstract class FieldParser<T> {
/** Invalid Boolean value **/
BOOLEAN_INVALID
}
-
+
+ private Charset charset = Charset.forName("UTF-8");
+
private ParseErrorState errorState = ParseErrorState.NONE;
-
+
/**
* Parses the value of a field from the byte array, taking care of properly reset
* the state of this parser.
@@ -217,7 +220,26 @@ public abstract class FieldParser<T> {
return limitedLength;
}
-
+
+ /*
+ * Gets the Charset for the parser.Default is set to ASCII
+ *
+ * @return The charset for the parser.
+ */
+ public Charset getCharset() {
+ return this.charset;
+ }
+
+ /**
+ * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
+ * when doing a parse.
+ *
+ * @param charset The charset to set.
+ */
+ public void setCharset(Charset charset) {
+ this.charset = charset;
+ }
+
// --------------------------------------------------------------------------------------------
// Mapping from types to parsers
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 1a2c7e3..7b46a7e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -63,11 +63,11 @@ public class StringParser extends FieldParser<String> {
// check for proper termination
if (i == limit) {
// either by end of line
- this.result = new String(bytes, startPos + 1, i - startPos - 2);
+ this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset());
return limit;
} else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) {
// or following field delimiter
- this.result = new String(bytes, startPos + 1, i - startPos - 2);
+ this.result = new String(bytes, startPos + 1, i - startPos - 2, getCharset());
return i + delimiter.length;
} else {
// no proper termination
@@ -87,14 +87,14 @@ public class StringParser extends FieldParser<String> {
if (limit == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
}
- this.result = new String(bytes, startPos, limit - startPos);
+ this.result = new String(bytes, startPos, limit - startPos, getCharset());
return limit;
} else {
// delimiter found.
if (i == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
}
- this.result = new String(bytes, startPos, i - startPos);
+ this.result = new String(bytes, startPos, i - startPos, getCharset());
return i + delimiter.length;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index e3215c6..d063ddc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -28,6 +28,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPOutputStream;
@@ -485,7 +486,7 @@ public class GenericCsvInputFormatTest {
fail("Input format accepted on invalid input.");
}
catch (ParseException e) {
- ; // all good
+ // all good
}
}
catch (Exception ex) {
@@ -547,7 +548,38 @@ public class GenericCsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
}
}
-
+
+ @Test
+ public void testReadWithCharset() throws IOException {
+ try {
+ final String fileContent = "\u00bf|Flink|\u00f1";
+ final FileInputSplit split = createTempFile(fileContent);
+
+ final Configuration parameters = new Configuration();
+
+ format.setCharset(Charset.forName("UTF-8"));
+ format.setFieldDelimiter("|");
+ format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class);
+
+ format.configure(parameters);
+ format.open(split);
+
+ Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()};
+
+ values = format.nextRecord(values);
+ assertNotNull(values);
+ assertEquals("\u00bf", ((StringValue) values[0]).getValue());
+ assertEquals("Flink", ((StringValue) values[1]).getValue());
+ assertEquals("\u00f1", ((StringValue) values[2]).getValue());
+
+ assertNull(format.nextRecord(values));
+ assertTrue(format.reachedEnd());
+ }
+ catch (Exception ex) {
+ fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
+ }
+ }
+
@Test
public void readWithEmptyField() {
try {
@@ -722,7 +754,7 @@ public class GenericCsvInputFormatTest {
return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
}
- private final Value[] createIntValues(int num) {
+ private Value[] createIntValues(int num) {
Value[] v = new Value[num];
for (int i = 0; i < num; i++) {
@@ -732,7 +764,7 @@ public class GenericCsvInputFormatTest {
return v;
}
- private final Value[] createLongValues(int num) {
+ private Value[] createLongValues(int num) {
Value[] v = new Value[num];
for (int i = 0; i < num; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 1fe8850..1c5579e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Test;
+import java.nio.charset.Charset;
+
public class VarLengthStringParserTest {
public StringValueParser parser = new StringValueParser();
@@ -194,4 +196,22 @@ public class VarLengthStringParserTest {
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s);
assertTrue(startPos < 0);
}
+
+ @Test
+ public void testParseValidMixedStringsWithCharset() {
+
+ Charset charset = Charset.forName("US-ASCII");
+ this.parser = new StringValueParser();
+ this.parser.enableQuotedStringParsing((byte) '@');
+
+ // check valid strings with out whitespaces and trailing delimiter
+ byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
+ StringValue s = new StringValue();
+
+ int startPos = 0;
+ parser.setCharset(charset);
+ startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s);
+ assertTrue(startPos == 11);
+ assertTrue(s.getValue().equals("abcde|gh"));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index 8be5dc2..b13b8aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.io;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
@@ -64,6 +65,8 @@ public class CsvReader {
protected boolean skipFirstLineAsHeader = false;
protected boolean ignoreInvalidLines = false;
+
+ private Charset charset = Charset.forName("UTF-8");
// --------------------------------------------------------------------------------------------
@@ -157,7 +160,25 @@ public class CsvReader {
this.commentPrefix = commentPrefix;
return this;
}
-
+
+ /**
+ * Gets the character set for the reader. Default is set to UTF-8.
+ *
+ * @return The charset for the reader.
+ */
+ public Charset getCharset() {
+ return this.charset;
+ }
+
+ /**
+ * Sets the charset of the reader
+ *
+ * @param charset The character set to set.
+ */
+ public void setCharset(Charset charset) {
+ this.charset = Preconditions.checkNotNull(charset);
+ }
+
/**
* Configures which fields of the CSV file should be included and which should be skipped. The
* parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
@@ -340,6 +361,7 @@ public class CsvReader {
format.setCommentPrefix(this.commentPrefix);
format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
format.setLenient(ignoreInvalidLines);
+ format.setCharset(this.charset);
if (this.parseQuotedStrings) {
format.enableQuotedStringParsing(this.quoteCharacter);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186af6/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index 8b12315..e1c8023 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Arrays;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -75,6 +76,14 @@ public class CSVReaderTest {
reader.ignoreComments("#");
assertEquals("#", reader.commentPrefix);
}
+
+ @Test
+ public void testCharset() {
+ CsvReader reader = getCsvReader();
+ assertEquals(reader.getCharset(), Charset.forName("UTF-8"));
+ reader.setCharset(Charset.forName("US-ASCII"));
+ assertEquals(reader.getCharset(), Charset.forName("US-ASCII"));
+ }
@Test
public void testIncludeFieldsDense() {
[4/4] flink git commit: [FLINK-3921] Add support to set encoding in
CsvReader and StringParser.
Posted by fh...@apache.org.
[FLINK-3921] Add support to set encoding in CsvReader and StringParser.
- extends first commit.
This closes #2901.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41d5875b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41d5875b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41d5875b
Branch: refs/heads/master
Commit: 41d5875bfc272f2cd5c7e8c8523036684865c1ce
Parents: f2186af
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Nov 28 12:43:47 2016 -0500
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 21:21:48 2016 +0100
----------------------------------------------------------------------
.../api/common/io/DelimitedInputFormat.java | 67 ++++++++--
.../api/common/io/GenericCsvInputFormat.java | 79 +++++-------
.../apache/flink/types/parser/FieldParser.java | 18 ++-
.../api/common/io/DelimitedInputFormatTest.java | 94 ++++++++++----
.../common/io/GenericCsvInputFormatTest.java | 125 ++++++++++---------
.../types/parser/VarLengthStringParserTest.java | 12 +-
.../org/apache/flink/api/java/io/CsvReader.java | 24 ++--
.../apache/flink/api/java/io/CSVReaderTest.java | 23 ++--
.../flink/api/java/io/CsvInputFormatTest.java | 6 +-
.../runtime/io/RowCsvInputFormatTest.scala | 6 +-
.../flink/api/scala/io/CsvInputFormatTest.scala | 14 +--
11 files changed, 273 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index fd02c82..5c8dfc1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -20,17 +20,18 @@ package org.apache.flink.api.common.io;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
@@ -56,8 +57,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
*/
private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class);
- /** The default charset to convert strings to bytes */
- private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+ // The charset used to convert strings to bytes
+ private String charsetName = "UTF-8";
+
+ // Charset is not serializable
+ private transient Charset charset;
/**
* The default read buffer size = 1MB.
@@ -157,9 +161,12 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
// --------------------------------------------------------------------------------------------
// The configuration parameters. Configured on the instance and serialized to be shipped.
// --------------------------------------------------------------------------------------------
-
+
+ // The delimiter may be set with a byte-sequence or a String. In the latter
+ // case the byte representation is updated consistent with current charset.
private byte[] delimiter = new byte[] {'\n'};
-
+ private String delimiterString = null;
+
private int lineLengthLimit = Integer.MAX_VALUE;
private int bufferSize = -1;
@@ -182,8 +189,42 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
}
loadConfigParameters(configuration);
}
-
-
+
+ /**
+ * Get the character set used for the row delimiter. This is also used by
+ * subclasses to interpret field delimiters, comment strings, and for
+ * configuring {@link FieldParser}s.
+ *
+ * @return the charset
+ */
+ @PublicEvolving
+ public Charset getCharset() {
+ if (this.charset == null) {
+ this.charset = Charset.forName(charsetName);
+ }
+ return this.charset;
+ }
+
+ /**
+ * Set the name of the character set used for the row delimiter. This is
+ * also used by subclasses to interpret field delimiters, comment strings,
+ * and for configuring {@link FieldParser}s.
+ *
+ * These fields are interpreted when set. Changing the charset thereafter
+ * may cause unexpected results.
+ *
+ * @param charset name of the charset
+ */
+ @PublicEvolving
+ public void setCharset(String charset) {
+ this.charsetName = Preconditions.checkNotNull(charset);
+ this.charset = null;
+
+ if (this.delimiterString != null) {
+ this.delimiter = delimiterString.getBytes(getCharset());
+ }
+ }
+
public byte[] getDelimiter() {
return delimiter;
}
@@ -193,6 +234,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
throw new IllegalArgumentException("Delimiter must not be null");
}
this.delimiter = delimiter;
+ this.delimiterString = null;
}
public void setDelimiter(char delimiter) {
@@ -203,7 +245,8 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}
- this.delimiter = delimiter.getBytes(UTF_8_CHARSET);
+ this.delimiter = delimiter.getBytes(getCharset());
+ this.delimiterString = delimiter;
}
public int getLineLengthLimit() {
@@ -264,7 +307,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
// --------------------------------------------------------------------------------------------
/**
- * Configures this input format by reading the path to the file from the configuration andge the string that
+ * Configures this input format by reading the path to the file from the configuration and the string that
* defines the record delimiter.
*
* @param parameters The configuration object to read the parameters from.
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 0ced22b..20c643e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -25,12 +25,10 @@ import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
@@ -46,9 +44,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
- /** The charset used to convert strings to bytes */
- private Charset charset = Charset.forName("UTF-8");
-
private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
private static final boolean[] EMPTY_INCLUDED = new boolean[0];
@@ -79,9 +74,12 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private Class<?>[] fieldTypes = EMPTY_TYPES;
protected boolean[] fieldIncluded = EMPTY_INCLUDED;
-
+
+ // The byte representation of the delimiter is updated consistent with
+ // current charset.
private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;
-
+ private String fieldDelimString = null;
+
private boolean lenient;
private boolean skipFirstLineAsHeader;
@@ -90,7 +88,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private byte quoteCharacter;
+ // The byte representation of the comment prefix is updated consistent with
+ // current charset.
protected byte[] commentPrefix = null;
+ private String commentPrefixString = null;
// --------------------------------------------------------------------------------------------
@@ -105,11 +106,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
super(filePath, null);
}
- protected GenericCsvInputFormat(Path filePath, Charset charset) {
- this(filePath);
- this.charset = Preconditions.checkNotNull(charset);
- }
-
// --------------------------------------------------------------------------------------------
public int getNumberOfFieldsTotal() {
@@ -120,43 +116,43 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
return this.fieldTypes.length;
}
+ @Override
+ public void setCharset(String charset) {
+ super.setCharset(charset);
+
+ if (this.fieldDelimString != null) {
+ this.fieldDelim = fieldDelimString.getBytes(getCharset());
+ }
+
+ if (this.commentPrefixString != null) {
+ this.commentPrefix = commentPrefixString.getBytes(getCharset());
+ }
+ }
+
public byte[] getCommentPrefix() {
return commentPrefix;
}
public void setCommentPrefix(String commentPrefix) {
- setCommentPrefix(commentPrefix, charset);
- }
-
- private void setCommentPrefix(String commentPrefix, Charset charset) {
- if (charset == null) {
- throw new IllegalArgumentException("Charset must not be null");
- }
if (commentPrefix != null) {
- this.commentPrefix = commentPrefix.getBytes(charset);
+ this.commentPrefix = commentPrefix.getBytes(getCharset());
} else {
this.commentPrefix = null;
}
+ this.commentPrefixString = commentPrefix;
}
public byte[] getFieldDelimiter() {
return fieldDelim;
}
- public void setFieldDelimiter(byte[] delimiter) {
+ public void setFieldDelimiter(String delimiter) {
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}
- this.fieldDelim = delimiter;
- }
-
- public void setFieldDelimiter(char delimiter) {
- setFieldDelimiter(String.valueOf(delimiter));
- }
-
- public void setFieldDelimiter(String delimiter) {
- this.fieldDelim = delimiter.getBytes(charset);
+ this.fieldDelim = delimiter.getBytes(getCharset());
+ this.fieldDelimString = delimiter;
}
public boolean isLenient() {
@@ -296,25 +292,6 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
this.fieldIncluded = includedMask;
}
- /**
- * Gets the character set for the parser. Default is set to UTF-8.
- *
- * @return The charset for the parser.
- */
- Charset getCharset() {
- return this.charset;
- }
-
- /**
- * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
- * when doing a parse.
- *
- * @param charset The character set to set.
- */
- public void setCharset(Charset charset) {
- this.charset = Preconditions.checkNotNull(charset);
- }
-
// --------------------------------------------------------------------------------------------
// Runtime methods
// --------------------------------------------------------------------------------------------
@@ -322,7 +299,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
-
+
// instantiate the parsers
FieldParser<?>[] parsers = new FieldParser<?>[fieldTypes.length];
@@ -335,7 +312,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
- p.setCharset(this.getCharset());
+ p.setCharset(getCharset());
if (this.quotedStringParsing) {
if (p instanceof StringParser) {
((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index d9eeecc..cf3c83d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -32,6 +32,7 @@ import org.apache.flink.types.StringValue;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -79,7 +80,7 @@ public abstract class FieldParser<T> {
BOOLEAN_INVALID
}
- private Charset charset = Charset.forName("UTF-8");
+ private Charset charset = StandardCharsets.UTF_8;
private ParseErrorState errorState = ParseErrorState.NONE;
@@ -105,9 +106,7 @@ public abstract class FieldParser<T> {
/**
* Each parser's logic should be implemented inside this method
- *
- * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)}
- * */
+ */
protected abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse);
/**
@@ -221,20 +220,19 @@ public abstract class FieldParser<T> {
return limitedLength;
}
- /*
- * Gets the Charset for the parser.Default is set to ASCII
+ /**
+ * Gets the character set used for this parser.
*
- * @return The charset for the parser.
+ * @return the charset used for this parser.
*/
public Charset getCharset() {
return this.charset;
}
/**
- * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset
- * when doing a parse.
+ * Sets the character set used for this parser.
*
- * @param charset The charset to set.
+ * @param charset charset used for this parser.
*/
public void setCharset(Charset charset) {
this.charset = charset;
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 93d5f9f..219365a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -18,12 +18,13 @@
package org.apache.flink.api.common.io;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -33,17 +34,17 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class DelimitedInputFormatTest {
@@ -200,6 +201,45 @@ public class DelimitedInputFormatTest {
}
}
+ @Test
+ public void testReadCustomDelimiterWithCharset() throws IOException {
+ // Unicode row fragments
+ String[] records = new String[]{"\u020e\u021f\u05c0\u020b\u020f", "Apache", "\nFlink", "\u0000", "\u05c0"};
+
+ // Unicode delimiter
+ String delimiter = "\u05c0\u05c0";
+
+ String fileContent = StringUtils.join(records, delimiter);
+
+ for (final String charset : new String[]{ "UTF-8", "UTF-16BE", "UTF-16LE" }) {
+ // use charset when instantiating the record String
+ DelimitedInputFormat<String> format = new DelimitedInputFormat<String>() {
+ @Override
+ public String readRecord(String reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+ return new String(bytes, offset, numBytes, charset);
+ }
+ };
+ format.setFilePath("file:///some/file/that/will/not/be/read");
+
+ final FileInputSplit split = createTempFile(fileContent, charset);
+
+ format.setDelimiter(delimiter);
+ // use the same encoding to parse the file as used to read the file;
+ // the delimiter is reinterpreted when the charset is set
+ format.setCharset(charset);
+ format.configure(new Configuration());
+ format.open(split);
+
+ for (String record : records) {
+ String value = format.nextRecord(null);
+ assertEquals(record, value);
+ }
+
+ assertNull(format.nextRecord(null));
+ assertTrue(format.reachedEnd());
+ }
+ }
+
/**
* Tests that the records are read correctly when the split boundary is in the middle of a record.
*/
@@ -363,19 +403,29 @@ public class DelimitedInputFormatTest {
fail(e.getMessage());
}
}
-
- private static FileInputSplit createTempFile(String contents) throws IOException {
+
+ static FileInputSplit createTempFile(String contents) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
-
- OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
- wrt.write(contents);
- wrt.close();
-
+
+ try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile))) {
+ out.write(contents);
+ }
+
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
}
-
-
+
+ static FileInputSplit createTempFile(String contents, String charset) throws IOException {
+ File tempFile = File.createTempFile("test_contents", "tmp");
+ tempFile.deleteOnExit();
+
+ try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile), charset)) {
+ out.write(contents);
+ }
+
+ return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
+ }
+
protected static final class MyTextInputFormat extends DelimitedInputFormat<String> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index d063ddc..c11a573 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -18,21 +18,7 @@
package org.apache.flink.api.common.io;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.GZIPOutputStream;
-
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
@@ -41,15 +27,29 @@ import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Arrays;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static org.apache.flink.api.common.io.DelimitedInputFormatTest.createTempFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class GenericCsvInputFormatTest {
- private File tempFile;
-
private TestCsvInputFormat format;
// --------------------------------------------------------------------------------------------
@@ -65,9 +65,6 @@ public class GenericCsvInputFormatTest {
if (this.format != null) {
this.format.close();
}
- if (this.tempFile != null) {
- this.tempFile.delete();
- }
}
@Test
@@ -87,7 +84,7 @@ public class GenericCsvInputFormatTest {
public void testReadNoPosAll() throws IOException {
try {
final String fileContent = "111|222|333|444|555\n666|777|888|999|000|";
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final Configuration parameters = new Configuration();
@@ -485,8 +482,7 @@ public class GenericCsvInputFormatTest {
format.nextRecord(values);
fail("Input format accepted on invalid input.");
}
- catch (ParseException e) {
- // all good
+ catch (ParseException ignored) {
}
}
catch (Exception ex) {
@@ -551,40 +547,64 @@ public class GenericCsvInputFormatTest {
@Test
public void testReadWithCharset() throws IOException {
- try {
- final String fileContent = "\u00bf|Flink|\u00f1";
- final FileInputSplit split = createTempFile(fileContent);
+ // Unicode row fragments
+ String[] records = new String[]{"\u020e\u021f", "Flink", "\u020b\u020f"};
- final Configuration parameters = new Configuration();
+ // Unicode delimiter
+ String delimiter = "\u05c0\u05c0";
- format.setCharset(Charset.forName("UTF-8"));
- format.setFieldDelimiter("|");
- format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class);
+ String fileContent = StringUtils.join(records, delimiter);
- format.configure(parameters);
- format.open(split);
+ // StringValueParser does not use charset so rely on StringParser
+ GenericCsvInputFormat<String[]> format = new GenericCsvInputFormat<String[]>() {
+ @Override
+ public String[] readRecord(String[] target, byte[] bytes, int offset, int numBytes) throws IOException {
+ return parseRecord(target, bytes, offset, numBytes) ? target : null;
+ }
+ };
+ format.setFilePath("file:///some/file/that/will/not/be/read");
- Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()};
+ for (String charset : new String[]{ "UTF-8", "UTF-16BE", "UTF-16LE" }) {
+ File tempFile = File.createTempFile("test_contents", "tmp");
+ tempFile.deleteOnExit();
+ // write string with proper encoding
+ try (Writer out = new OutputStreamWriter(new FileOutputStream(tempFile), charset)) {
+ out.write(fileContent);
+ }
+
+ FileInputSplit split = new FileInputSplit(0, new Path(tempFile.toURI().toString()),
+ 0, tempFile.length(), new String[]{ "localhost" });
+
+ format.setFieldDelimiter(delimiter);
+ format.setFieldTypesGeneric(String.class, String.class, String.class);
+ // use the same encoding to parse the file as used to read the file;
+ // the field delimiter is reinterpreted when the charset is set
+ format.setCharset(charset);
+ format.configure(new Configuration());
+ format.open(split);
+
+ String[] values = new String[]{ "", "", "" };
values = format.nextRecord(values);
+
+ // validate results
assertNotNull(values);
- assertEquals("\u00bf", ((StringValue) values[0]).getValue());
- assertEquals("Flink", ((StringValue) values[1]).getValue());
- assertEquals("\u00f1", ((StringValue) values[2]).getValue());
+ for (int i = 0 ; i < records.length ; i++) {
+ assertEquals(records[i], values[i]);
+ }
assertNull(format.nextRecord(values));
assertTrue(format.reachedEnd());
}
- catch (Exception ex) {
- fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
- }
+
+ format.close();
}
@Test
public void readWithEmptyField() {
try {
final String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final Configuration parameters = new Configuration();
@@ -721,37 +741,26 @@ public class GenericCsvInputFormatTest {
}
}
- private FileInputSplit createTempFile(String content) throws IOException {
- this.tempFile = File.createTempFile("test_contents", "tmp");
- this.tempFile.deleteOnExit();
-
- DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempFile));
- dos.writeBytes(content);
- dos.close();
-
- return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
- }
-
private FileInputSplit createTempDeflateFile(String content) throws IOException {
- this.tempFile = File.createTempFile("test_contents", "tmp.deflate");
- this.tempFile.deleteOnExit();
+ File tempFile = File.createTempFile("test_contents", "tmp.deflate");
+ tempFile.deleteOnExit();
DataOutputStream dos = new DataOutputStream(new DeflaterOutputStream(new FileOutputStream(tempFile)));
dos.writeBytes(content);
dos.close();
- return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
+ return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
}
private FileInputSplit createTempGzipFile(String content) throws IOException {
- this.tempFile = File.createTempFile("test_contents", "tmp.gz");
- this.tempFile.deleteOnExit();
+ File tempFile = File.createTempFile("test_contents", "tmp.gz");
+ tempFile.deleteOnExit();
DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(tempFile)));
dos.writeBytes(content);
dos.close();
- return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"});
+ return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
}
private Value[] createIntValues(int num) {
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 1c5579e..718274e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -19,13 +19,15 @@
package org.apache.flink.types.parser;
-import static org.junit.Assert.assertTrue;
-
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Test;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class VarLengthStringParserTest {
@@ -200,7 +202,7 @@ public class VarLengthStringParserTest {
@Test
public void testParseValidMixedStringsWithCharset() {
- Charset charset = Charset.forName("US-ASCII");
+ Charset charset = StandardCharsets.US_ASCII;
this.parser = new StringValueParser();
this.parser.enableQuotedStringParsing((byte) '@');
@@ -211,7 +213,7 @@ public class VarLengthStringParserTest {
int startPos = 0;
parser.setCharset(charset);
startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s);
- assertTrue(startPos == 11);
- assertTrue(s.getValue().equals("abcde|gh"));
+ assertEquals(11, startPos);
+ assertEquals("abcde|gh", s.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index b13b8aa..cbac386 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -18,24 +18,22 @@
package org.apache.flink.api.java.io;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSource;
+//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+import org.apache.flink.api.java.tuple.*;
+//CHECKSTYLE.ON: AvoidStarImport
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
-import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+import java.util.ArrayList;
+import java.util.Arrays;
/**
* A builder class to instantiate a CSV parsing data source. The CSV reader configures the field types,
@@ -66,7 +64,7 @@ public class CsvReader {
protected boolean ignoreInvalidLines = false;
- private Charset charset = Charset.forName("UTF-8");
+ private String charset = "UTF-8";
// --------------------------------------------------------------------------------------------
@@ -162,11 +160,12 @@ public class CsvReader {
}
/**
- * Gets the character set for the reader. Default is set to UTF-8.
+ * Gets the character set for the reader. Default is UTF-8.
*
* @return The charset for the reader.
*/
- public Charset getCharset() {
+ @PublicEvolving
+ public String getCharset() {
return this.charset;
}
@@ -175,7 +174,8 @@ public class CsvReader {
*
* @param charset The character set to set.
*/
- public void setCharset(Charset charset) {
+ @PublicEvolving
+ public void setCharset(String charset) {
this.charset = Preconditions.checkNotNull(charset);
}
@@ -356,12 +356,12 @@ public class CsvReader {
// --------------------------------------------------------------------------------------------
private void configureInputFormat(CsvInputFormat<?> format) {
+ format.setCharset(this.charset);
format.setDelimiter(this.lineDelimiter);
format.setFieldDelimiter(this.fieldDelimiter);
format.setCommentPrefix(this.commentPrefix);
format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
format.setLenient(ignoreInvalidLines);
- format.setCharset(this.charset);
if (this.parseQuotedStrings) {
format.enableQuotedStringParsing(this.quoteCharacter);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index e1c8023..de57e5c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -18,15 +18,9 @@
package org.apache.flink.api.java.io;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
@@ -47,7 +41,12 @@ import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.flink.api.java.ExecutionEnvironment;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
/**
* Tests for the CSV reader builder.
@@ -80,11 +79,11 @@ public class CSVReaderTest {
@Test
public void testCharset() {
CsvReader reader = getCsvReader();
- assertEquals(reader.getCharset(), Charset.forName("UTF-8"));
- reader.setCharset(Charset.forName("US-ASCII"));
- assertEquals(reader.getCharset(), Charset.forName("US-ASCII"));
+ assertEquals("UTF-8", reader.getCharset());
+ reader.setCharset("US-ASCII");
+ assertEquals("US-ASCII", reader.getCharset());
}
-
+
@Test
public void testIncludeFieldsDense() {
CsvReader reader = getCsvReader();
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 54f226c..cc0d5bc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -771,7 +771,7 @@ public class CsvInputFormatTest {
final CsvInputFormat<Tuple5<Integer, String, String, String, Double>> format = new TupleCsvInputFormat<Tuple5<Integer, String, String, String, Double>>(PATH, typeInfo);
format.setSkipFirstLineAsHeader(true);
- format.setFieldDelimiter(',');
+ format.setFieldDelimiter(",");
format.configure(new Configuration());
format.open(split);
@@ -1077,7 +1077,7 @@ public class CsvInputFormatTest {
CsvInputFormat<Tuple2<String, String>> inputFormat = new TupleCsvInputFormat<Tuple2<String, String>>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true});
inputFormat.enableQuotedStringParsing('"');
- inputFormat.setFieldDelimiter('|');
+ inputFormat.setFieldDelimiter("|");
inputFormat.setDelimiter('\n');
inputFormat.configure(new Configuration());
@@ -1107,7 +1107,7 @@ public class CsvInputFormatTest {
CsvInputFormat<Tuple2<String, String>> inputFormat = new TupleCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.enableQuotedStringParsing('"');
- inputFormat.setFieldDelimiter('|');
+ inputFormat.setFieldDelimiter("|");
inputFormat.setDelimiter('\n');
inputFormat.configure(new Configuration());
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
index d176b79..d72e7a8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -656,7 +656,7 @@ class RowCsvInputFormatTest {
val format = new RowCsvInputFormat(PATH, typeInfo)
format.setSkipFirstLineAsHeader(true)
- format.setFieldDelimiter(',')
+ format.setFieldDelimiter(",")
format.configure(new Configuration)
format.open(split)
@@ -745,7 +745,7 @@ class RowCsvInputFormatTest {
rowTypeInfo = typeInfo,
includedFieldsMask = Array(true, false, true))
inputFormat.enableQuotedStringParsing('"')
- inputFormat.setFieldDelimiter('|')
+ inputFormat.setFieldDelimiter("|")
inputFormat.setDelimiter('\n')
inputFormat.configure(new Configuration)
@@ -776,7 +776,7 @@ class RowCsvInputFormatTest {
new Path(tempFile.toURI.toString),
rowTypeInfo = typeInfo)
inputFormat.enableQuotedStringParsing('"')
- inputFormat.setFieldDelimiter('|')
+ inputFormat.setFieldDelimiter("|")
inputFormat.setDelimiter('\n')
inputFormat.configure(new Configuration)
http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 24d86e7..539a257 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -56,7 +56,7 @@ class CsvInputFormatTest {
createTypeInformation[(String, Integer, Double)]
.asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
format.setDelimiter("\n")
- format.setFieldDelimiter('|')
+ format.setFieldDelimiter("|")
format.setCommentPrefix("#")
val parameters = new Configuration
format.configure(parameters)
@@ -98,7 +98,7 @@ class CsvInputFormatTest {
createTypeInformation[(String, Integer, Double)]
.asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
format.setDelimiter("\n")
- format.setFieldDelimiter('|')
+ format.setFieldDelimiter("|")
format.setCommentPrefix("//")
val parameters = new Configuration
format.configure(parameters)
@@ -443,7 +443,7 @@ class CsvInputFormatTest {
val format = new PojoCsvInputFormat[POJOItem](PATH, typeInfo)
format.setDelimiter('\n')
- format.setFieldDelimiter(',')
+ format.setFieldDelimiter(",")
format.configure(new Configuration)
format.open(tempFile)
@@ -460,7 +460,7 @@ class CsvInputFormatTest {
val format = new TupleCsvInputFormat[CaseClassItem](PATH, typeInfo)
format.setDelimiter('\n')
- format.setFieldDelimiter(',')
+ format.setFieldDelimiter(",")
format.configure(new Configuration)
format.open(tempFile)
@@ -477,7 +477,7 @@ class CsvInputFormatTest {
PATH, typeInfo, Array("field2", "field1", "field3"))
format.setDelimiter('\n')
- format.setFieldDelimiter(',')
+ format.setFieldDelimiter(",")
format.configure(new Configuration)
format.open(tempFile)
@@ -495,7 +495,7 @@ class CsvInputFormatTest {
Array(true, true, false, true, false))
format.setDelimiter('\n')
- format.setFieldDelimiter(',')
+ format.setFieldDelimiter(",")
format.configure(new Configuration)
format.open(tempFile)
@@ -511,7 +511,7 @@ class CsvInputFormatTest {
val format = new PojoCsvInputFormat[TwitterPOJO](PATH, typeInfo)
format.setDelimiter('\n')
- format.setFieldDelimiter(',')
+ format.setFieldDelimiter(",")
format.configure(new Configuration)
format.open(tempFile)
[2/4] flink git commit: [FLINK-5039] Bump Avro version to 1.7.7.
Posted by fh...@apache.org.
[FLINK-5039] Bump Avro version to 1.7.7.
This closes #2953.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d8f03e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d8f03e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d8f03e7
Branch: refs/heads/master
Commit: 2d8f03e7ad12af3a0dcb7bec087c25f19a4fd03e
Parents: 677d0d9
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Dec 6 21:03:10 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 8 18:45:55 2016 +0100
----------------------------------------------------------------------
pom.xml | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d8f03e7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d3ddf92..04ba726 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,13 +220,13 @@ under the License.
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
- <version>1.7.6</version>
+ <version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
- <version>1.7.6</version>
+ <version>1.7.7</version>
</dependency>
<!-- Make sure we use a consistent commons-cli version throughout the project -->