You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/01 08:34:01 UTC

[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers

    [ https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633715#comment-16633715 ] 

ASF GitHub Bot commented on FLINK-2435:
---------------------------------------

ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221248122
 
 

 ##########
 File path: flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
 ##########
 @@ -262,54 +267,115 @@ public void setCharset(Charset charset) {
 	// --------------------------------------------------------------------------------------------
 	//  Mapping from types to parsers
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * Gets the parser for the type specified by the given class. Returns null, if no parser for that class
+	 * Provides an instance of {@link FieldParser} that corresponds to the specified type.
+	 * @param type a field type for which a {@link FieldParser} is needed.
+	 * @return if there is a custom parser for the specified field type - it is returned; then, if there is a default parser
+	 * responsible for the specified type - it is returned; otherwise, {@code null} is returned.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> FieldParser<T> getParserInstanceFor(Class<T> type) {
+		ParserFactory<T> parserFactory = (ParserFactory<T>) CUSTOM_PARSERS.get(type);
+		if (parserFactory == null) {
+			parserFactory = (ParserFactory<T>) DEFAULT_PARSERS.get(type);
+		}
+
+		if (parserFactory == null) {
+			return null;
+		}
+
+		return parserFactory.create();
+	}
+
+	/**
+	 * Gets the default parser for the type specified by the given class. Returns null, if no parser for that class
 	 * is known.
-	 * 
+	 *
 	 * @param type The class of the type to get the parser for.
 	 * @return The parser for the given type, or null, if no such parser exists.
 	 */
-	public static <T> Class<FieldParser<T>> getParserForType(Class<T> type) {
-		Class<? extends FieldParser<?>> parser = PARSERS.get(type);
-		if (parser == null) {
+	public static <T> Class<FieldParser<T>> getDefaultParserForType(Class<T> type) {
+		ParserFactory<?> parserFactory = DEFAULT_PARSERS.get(type);
+		if (parserFactory == null) {
 			return null;
 		} else {
 			@SuppressWarnings("unchecked")
-			Class<FieldParser<T>> typedParser = (Class<FieldParser<T>>) parser;
+			Class<FieldParser<T>> typedParser = (Class<FieldParser<T>>) parserFactory.getParserType();
 			return typedParser;
 		}
 	}
-	
-	private static final Map<Class<?>, Class<? extends FieldParser<?>>> PARSERS = 
-			new HashMap<Class<?>, Class<? extends FieldParser<?>>>();
-	
+
+	/**
+	 * Gets the custom parser for the type specified by the given class. Returns null, if no parser for that class
+	 * is known.
+	 *
+	 * @param type The class of the type to get the parser for.
+	 * @return The parser for the given type, or null, if no such parser exists.
+	 */
+	public static <T> Class<? extends FieldParser<T>> getCustomParserForType(Class<T> type) {
+		synchronized (CUSTOM_PARSERS) {
+			ParserFactory<T> parserFactory = (ParserFactory<T>) CUSTOM_PARSERS.get(type);
+			if (parserFactory == null) {
+				return null;
+			} else {
+				return parserFactory.getParserType();
+			}
+		}
+	}
+
+	private static final Map<Class<?>, ParserFactory<?>> DEFAULT_PARSERS = new HashMap<>();
+
 	static {
 		// basic types
-		PARSERS.put(Byte.class, ByteParser.class);
-		PARSERS.put(Short.class, ShortParser.class);
-		PARSERS.put(Integer.class, IntParser.class);
-		PARSERS.put(Long.class, LongParser.class);
-		PARSERS.put(String.class, StringParser.class);
-		PARSERS.put(Float.class, FloatParser.class);
-		PARSERS.put(Double.class, DoubleParser.class);
-		PARSERS.put(Boolean.class, BooleanParser.class);
-		PARSERS.put(BigDecimal.class, BigDecParser.class);
-		PARSERS.put(BigInteger.class, BigIntParser.class);
+		DEFAULT_PARSERS.put(Byte.class, new DefaultParserFactory<>(ByteParser.class));
+		DEFAULT_PARSERS.put(Short.class, new DefaultParserFactory<>(ShortParser.class));
+		DEFAULT_PARSERS.put(Integer.class, new DefaultParserFactory<>(IntParser.class));
+		DEFAULT_PARSERS.put(Long.class, new DefaultParserFactory<>(LongParser.class));
+		DEFAULT_PARSERS.put(String.class, new DefaultParserFactory<>(StringParser.class));
+		DEFAULT_PARSERS.put(Float.class, new DefaultParserFactory<>(FloatParser.class));
+		DEFAULT_PARSERS.put(Double.class, new DefaultParserFactory<>(DoubleParser.class));
+		DEFAULT_PARSERS.put(Boolean.class, new DefaultParserFactory<>(BooleanParser.class));
+		DEFAULT_PARSERS.put(BigDecimal.class, new DefaultParserFactory<>(BigDecParser.class));
+		DEFAULT_PARSERS.put(BigInteger.class, new DefaultParserFactory<>(BigIntParser.class));
 
 		// value types
-		PARSERS.put(ByteValue.class, ByteValueParser.class);
-		PARSERS.put(ShortValue.class, ShortValueParser.class);
-		PARSERS.put(IntValue.class, IntValueParser.class);
-		PARSERS.put(LongValue.class, LongValueParser.class);
-		PARSERS.put(StringValue.class, StringValueParser.class);
-		PARSERS.put(FloatValue.class, FloatValueParser.class);
-		PARSERS.put(DoubleValue.class, DoubleValueParser.class);
-		PARSERS.put(BooleanValue.class, BooleanValueParser.class);
+		DEFAULT_PARSERS.put(ByteValue.class, new DefaultParserFactory<>(ByteValueParser.class));
+		DEFAULT_PARSERS.put(ShortValue.class, new DefaultParserFactory<>(ShortValueParser.class));
+		DEFAULT_PARSERS.put(IntValue.class, new DefaultParserFactory<>(IntValueParser.class));
+		DEFAULT_PARSERS.put(LongValue.class, new DefaultParserFactory<>(LongValueParser.class));
+		DEFAULT_PARSERS.put(StringValue.class, new DefaultParserFactory<>(StringValueParser.class));
+		DEFAULT_PARSERS.put(FloatValue.class, new DefaultParserFactory<>(FloatValueParser.class));
+		DEFAULT_PARSERS.put(DoubleValue.class, new DefaultParserFactory<>(DoubleValueParser.class));
+		DEFAULT_PARSERS.put(BooleanValue.class, new DefaultParserFactory<>(BooleanValueParser.class));
 
 		// SQL date/time types
-		PARSERS.put(java.sql.Time.class, SqlTimeParser.class);
-		PARSERS.put(java.sql.Date.class, SqlDateParser.class);
-		PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class);
+		DEFAULT_PARSERS.put(java.sql.Time.class, new DefaultParserFactory<>(SqlTimeParser.class));
+		DEFAULT_PARSERS.put(java.sql.Date.class, new DefaultParserFactory<>(SqlDateParser.class));
+		DEFAULT_PARSERS.put(java.sql.Timestamp.class, new DefaultParserFactory<>(SqlTimestampParser.class));
 	}
+
+	private static final Map<Class<?>, ParserFactory<?>> CUSTOM_PARSERS = new HashMap<>();
+
+	/**
+	 * Registers a user-defined (custom) type with a parser factory for it.
+	 * Custom type parsing precedes default one.
+	 * @param type a user-defined type
+	 * @param factory the type's parser factory.
+	 * @return the registration status: 1 - registration is successful, -1 - otherwise.
+	 */
+	public static <T> int registerCustomParser(Class<T> type, ParserFactory<T> factory) {
+		Preconditions.checkNotNull(type, "The type must be not null.");
+		Preconditions.checkNotNull(factory, "The factory must be not null.");
+
+		synchronized (CUSTOM_PARSERS) {
+			if (CUSTOM_PARSERS.containsKey(type)) {
+				LOG.warn("'{}' type is already registered with '{}' parser. Skipping.");
 
 Review comment:
   You don't provide arguments for format message to log.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add support for custom CSV field parsers
> ----------------------------------------
>
>                 Key: FLINK-2435
>                 URL: https://issues.apache.org/jira/browse/FLINK-2435
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataSet API
>    Affects Versions: 0.10.0
>            Reporter: Fabian Hueske
>            Assignee: Dmitrii Kober
>            Priority: Minor
>              Labels: pull-request-available
>
> The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types (byte, short, int, long, float, double, boolean, String).
> It would be good to add support for CSV field parsers for custom data types which can be registered in a {{CSVReader}}. 
> We could offer two interfaces for field parsers.
> 1. The regular low-level {{FieldParser}} which operates on a byte array and offsets.
> 2. A {{StringFieldParser}} which operates on a String that has been extracted by a {{StringParser}} before. This interface will be easier to implement but less efficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)