You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:40 UTC

[07/50] [abbrv] flink git commit: [FLINK-4268] [core] Add a parsers for BigDecimal/BigInteger

[FLINK-4268] [core] Add a parsers for BigDecimal/BigInteger

This closes #2304.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c02988b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c02988b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c02988b

Branch: refs/heads/flip-6
Commit: 5c02988b05c56f524fc8c65b15e16b0c24278a5e
Parents: e58fd6e
Author: twalthr <tw...@apache.org>
Authored: Wed Jul 27 15:48:48 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Sep 20 16:43:59 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/types/parser/BigDecParser.java | 145 +++++++++++++++++++
 .../apache/flink/types/parser/BigIntParser.java | 126 ++++++++++++++++
 .../apache/flink/types/parser/FieldParser.java  |   4 +
 .../flink/types/parser/BigDecParserTest.java    |  79 ++++++++++
 .../flink/types/parser/BigIntParserTest.java    |  68 +++++++++
 5 files changed, 422 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
new file mode 100644
index 0000000..46a07fa
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.math.BigDecimal;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link java.math.BigDecimal}.
+ */
+@PublicEvolving
+public class BigDecParser extends FieldParser<BigDecimal> {
+
+	private static final BigDecimal BIG_DECIMAL_INSTANCE = BigDecimal.ZERO;
+
+	private BigDecimal result;
+	private char[] reuse = null;
+
+	@Override
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigDecimal reusable) {
+		int i = startPos;
+
+		final int delimLimit = limit - delimiter.length + 1;
+
+		while (i < limit) {
+			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_COLUMN);
+					return -1;
+				}
+				break;
+			}
+			i++;
+		}
+
+		if (i > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+			return -1;
+		}
+
+		try {
+			final int length = i - startPos;
+			if (reuse == null || reuse.length < length) {
+				reuse = new char[length];
+			}
+			for (int j = 0; j < length; j++) {
+				final byte b = bytes[startPos + j];
+				if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') {
+					throw new NumberFormatException();
+				}
+				reuse[j] = (char) bytes[startPos + j];
+			}
+
+			this.result = new BigDecimal(reuse, 0, length);
+			return (i == limit) ? limit : i + delimiter.length;
+		} catch (NumberFormatException e) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+			return -1;
+		}
+	}
+
+	@Override
+	public BigDecimal createValue() {
+		return BIG_DECIMAL_INSTANCE;
+	}
+
+	@Override
+	public BigDecimal getLastResult() {
+		return this.result;
+	}
+
+	/**
+	 * Static utility to parse a field of type BigDecimal from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
+	 * @param startPos The offset to start the parsing.
+	 * @param length   The length of the byte sequence (counting from the offset).
+	 * @return The parsed value.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
+	 */
+	public static final BigDecimal parseField(byte[] bytes, int startPos, int length) {
+		return parseField(bytes, startPos, length, (char) 0xffff);
+	}
+
+	/**
+	 * Static utility to parse a field of type BigDecimal from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
+	 * @param delimiter The delimiter that terminates the field.
+	 * @return The parsed value.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
+	 */
+	public static final BigDecimal parseField(byte[] bytes, int startPos, int length, char delimiter) {
+		if (length <= 0) {
+			throw new NumberFormatException("Invalid input: Empty string");
+		}
+		int i = 0;
+		final byte delByte = (byte) delimiter;
+
+		while (i < length && bytes[startPos + i] != delByte) {
+			i++;
+		}
+
+		if (i > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
+		}
+
+		final char[] chars = new char[i];
+		for (int j = 0; j < i; j++) {
+			final byte b = bytes[startPos + j];
+			if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') {
+				throw new NumberFormatException();
+			}
+			chars[j] = (char) bytes[startPos + j];
+		}
+		return new BigDecimal(chars);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
new file mode 100644
index 0000000..13361c1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.math.BigInteger;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link java.math.BigInteger}.
+ */
+@PublicEvolving
+public class BigIntParser extends FieldParser<BigInteger> {
+
+	private static final BigInteger BIG_INTEGER_INSTANCE = BigInteger.ZERO;
+
+	private BigInteger result;
+
+	@Override
+	public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigInteger reusable) {
+		int i = startPos;
+
+		final int delimLimit = limit - delimiter.length + 1;
+
+		while (i < limit) {
+			if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
+				if (i == startPos) {
+					setErrorState(ParseErrorState.EMPTY_COLUMN);
+					return -1;
+				}
+				break;
+			}
+			i++;
+		}
+
+		if (i > startPos &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+			return -1;
+		}
+
+		String str = new String(bytes, startPos, i - startPos);
+		try {
+			this.result = new BigInteger(str);
+			return (i == limit) ? limit : i + delimiter.length;
+		} catch (NumberFormatException e) {
+			setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+			return -1;
+		}
+	}
+
+	@Override
+	public BigInteger createValue() {
+		return BIG_INTEGER_INSTANCE;
+	}
+
+	@Override
+	public BigInteger getLastResult() {
+		return this.result;
+	}
+
+	/**
+	 * Static utility to parse a field of type BigInteger from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes    The bytes containing the text data that should be parsed.
+	 * @param startPos The offset to start the parsing.
+	 * @param length   The length of the byte sequence (counting from the offset).
+	 * @return The parsed value.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
+	 */
+	public static final BigInteger parseField(byte[] bytes, int startPos, int length) {
+		return parseField(bytes, startPos, length, (char) 0xffff);
+	}
+
+	/**
+	 * Static utility to parse a field of type BigInteger from a byte sequence that represents text
+	 * characters
+	 * (such as when read from a file stream).
+	 *
+	 * @param bytes     The bytes containing the text data that should be parsed.
+	 * @param startPos  The offset to start the parsing.
+	 * @param length    The length of the byte sequence (counting from the offset).
+	 * @param delimiter The delimiter that terminates the field.
+	 * @return The parsed value.
+	 * @throws NumberFormatException Thrown when the value cannot be parsed because the text 
+	 * represents not a correct number.
+	 */
+	public static final BigInteger parseField(byte[] bytes, int startPos, int length, char delimiter) {
+		if (length <= 0) {
+			throw new NumberFormatException("Invalid input: Empty string");
+		}
+		int i = 0;
+		final byte delByte = (byte) delimiter;
+
+		while (i < length && bytes[startPos + i] != delByte) {
+			i++;
+		}
+
+		if (i > 0 &&
+				(Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) {
+			throw new NumberFormatException("There is leading or trailing whitespace in the numeric field.");
+		}
+
+		String str = new String(bytes, startPos, i);
+		return new BigInteger(str);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 5f17840..a1b9c5f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.types.parser;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -208,6 +210,8 @@ public abstract class FieldParser<T> {
 		PARSERS.put(Float.class, FloatParser.class);
 		PARSERS.put(Double.class, DoubleParser.class);
 		PARSERS.put(Boolean.class, BooleanParser.class);
+		PARSERS.put(BigDecimal.class, BigDecParser.class);
+		PARSERS.put(BigInteger.class, BigIntParser.class);
 
 		// value types
 		PARSERS.put(ByteValue.class, ByteValueParser.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java
new file mode 100644
index 0000000..9e17565
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.math.BigDecimal;
+
+public class BigDecParserTest extends ParserTestBase<BigDecimal> {
+
+	@Override
+	public String[] getValidTestValues() {
+		return new String[] {
+			"-12.5E1000", "-12.5E100", "-10000", "-1.1", "-1", "-0.44",
+			"0", "0000000", "0e0", "0.000000000000000000000000001", "0.0000001",
+			"0.1234123413478523984729447", "1", "10000", "10E100000", "10E1000000000"
+		};
+	}
+
+	@Override
+	public BigDecimal[] getValidTestResults() {
+		return new BigDecimal[] {
+			new BigDecimal("-12.5E1000"),
+			new BigDecimal("-12.5E100"),
+			new BigDecimal("-10000"),
+			new BigDecimal("-1.1"),
+			new BigDecimal("-1"),
+			new BigDecimal("-0.44"),
+			new BigDecimal("0"),
+			new BigDecimal("0"),
+			new BigDecimal("0e0"),
+			new BigDecimal("0.000000000000000000000000001"),
+			new BigDecimal("0.0000001"),
+			new BigDecimal("0.1234123413478523984729447"),
+			new BigDecimal("1"),
+			new BigDecimal("10000"),
+			new BigDecimal("10E100000"),
+			new BigDecimal("10E1000000000"),
+		};
+	}
+
+	@Override
+	public String[] getInvalidTestValues() {
+		return new String[] {
+			"a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
+		};
+	}
+
+	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
+	public FieldParser<BigDecimal> getParser() {
+		return new BigDecParser();
+	}
+
+	@Override
+	public Class<BigDecimal> getTypeClass() {
+		return BigDecimal.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java
new file mode 100644
index 0000000..473cb55
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.math.BigInteger;
+
+public class BigIntParserTest extends ParserTestBase<BigInteger> {
+
+	@Override
+	public String[] getValidTestValues() {
+		return new String[] {
+			"-8745979691234123413478523984729447", "-10000", "-1", "0",
+			"0000000", "8745979691234123413478523984729447"
+		};
+	}
+
+	@Override
+	public BigInteger[] getValidTestResults() {
+		return new BigInteger[] {
+			new BigInteger("-8745979691234123413478523984729447"),
+			new BigInteger("-10000"),
+			new BigInteger("-1"),
+			new BigInteger("0"),
+			new BigInteger("0"),
+			new BigInteger("8745979691234123413478523984729447")
+		};
+	}
+
+	@Override
+	public String[] getInvalidTestValues() {
+		return new String[] {
+			"1.1" ,"a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t"
+		};
+	}
+
+	@Override
+	public boolean allowsEmptyField() {
+		return false;
+	}
+
+	@Override
+	public FieldParser<BigInteger> getParser() {
+		return new BigIntParser();
+	}
+
+	@Override
+	public Class<BigInteger> getTypeClass() {
+		return BigInteger.class;
+	}
+}