You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/15 12:56:18 UTC

[flink] branch master updated: [FLINK-11727][formats] Fixed JSON format issues after serialization

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 38e4e2b  [FLINK-11727][formats] Fixed JSON format issues after serialization
38e4e2b is described below

commit 38e4e2b8f9bc63a793a2bddef5a578e3f80b7376
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Mar 5 10:45:20 2019 +0100

    [FLINK-11727][formats] Fixed JSON format issues after serialization
    
    This commit reworks JSON format to use a runtime converter created based
    on given TypeInformation. Pre this commit conversion logic was based on
    reference comparison of TypeInformation which was not working after
    serialization of the format.
    
    This also introduces a builder pattern for ensuring future immutability
    of schemas.
    
    This closes #7932.
---
 .../test-scripts/kafka_sql_common.sh               |  19 +-
 .../formats/json/JsonRowDeserializationSchema.java | 394 +++++++++++++++------
 .../flink/formats/json/JsonRowFormatFactory.java   |  13 +-
 .../formats/json/JsonRowSerializationSchema.java   | 332 +++++++++++------
 .../org/apache/flink/formats/json/TimeFormats.java |  46 +++
 .../json/JsonRowDeserializationSchemaTest.java     |  66 ++--
 .../formats/json/JsonRowFormatFactoryTest.java     |  12 +-
 .../json/JsonRowSerializationSchemaTest.java       |  63 ++--
 .../utils/DeserializationSchemaMatcher.java        | 164 +++++++++
 .../formats/utils/SerializationSchemaMatcher.java  | 192 ++++++++++
 10 files changed, 1020 insertions(+), 281 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
index 85dbbae..37744d6 100644
--- a/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
+++ b/flink-end-to-end-tests/test-scripts/kafka_sql_common.sh
@@ -31,14 +31,14 @@ function create_kafka_json_source {
     # put JSON data into Kafka
     echo "Sending messages to Kafka..."
 
-    send_messages_to_kafka '{"timestamp": "2018-03-12 08:00:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 08:10:00", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:00:00", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:10:00", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:20:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 09:30:00", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' $topicName
-    send_messages_to_kafka '{"timestamp": "2018-03-12 10:40:00", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T08:00:00Z", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T08:10:00Z", "user": "Alice", "event": { "type": "WARNING", "message": "This is a warning."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:00:00Z", "user": "Bob", "event": { "type": "WARNING", "message": "This is another warning."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:10:00Z", "user": "Alice", "event": { "type": "INFO", "message": "This is a info."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:20:00Z", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:30:00Z", "user": "Steve", "event": { "type": "INFO", "message": "This is another info."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T09:30:00Z", "user": null, "event": { "type": "WARNING", "message": "This is a bad message because the user is missing."}}' $topicName
+    send_messages_to_kafka '{"timestamp": "2018-03-12T10:40:00Z", "user": "Bob", "event": { "type": "ERROR", "message": "This is an error."}}' $topicName
 }
 
 function get_kafka_json_source_schema {
@@ -79,7 +79,8 @@ function get_kafka_json_source_schema {
           "type": "object",
           "properties": {
             "timestamp": {
-              "type": "string"
+              "type": "string",
+              "format": "date-time"
             },
             "user": {
               "type": ["string", "null"]
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index df1fbc5..4a1ff27 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -27,18 +27,39 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 import java.io.IOException;
-import java.lang.reflect.Array;
+import java.io.Serializable;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static java.util.Spliterators.spliterator;
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Deserialization schema from JSON to Flink types.
@@ -54,45 +75,56 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 	private static final long serialVersionUID = -228294330688809195L;
 
 	/** Type information describing the result type. */
-	private final TypeInformation<Row> typeInfo;
+	private final RowTypeInfo typeInfo;
+
+	private boolean failOnMissingField;
 
 	/** Object mapper for parsing the JSON. */
 	private final ObjectMapper objectMapper = new ObjectMapper();
 
-	/** Flag indicating whether to fail on a missing field. */
-	private boolean failOnMissingField;
+	private DeserializationRuntimeConverter runtimeConverter;
+
+	private JsonRowDeserializationSchema(
+			TypeInformation<Row> typeInfo,
+			boolean failOnMissingField) {
+		checkNotNull(typeInfo, "Type information");
+		checkArgument(typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
+		this.typeInfo = (RowTypeInfo) typeInfo;
+		this.failOnMissingField = failOnMissingField;
+		this.runtimeConverter = createConverter(this.typeInfo);
+	}
 
 	/**
-	 * Creates a JSON deserialization schema for the given type information.
-	 *
-	 * @param typeInfo Type information describing the result type. The field names of {@link Row}
-	 *                 are used to parse the JSON properties.
+	 * @deprecated Use the provided {@link Builder} instead.
 	 */
+	@Deprecated
 	public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
-		Preconditions.checkNotNull(typeInfo, "Type information");
-		this.typeInfo = typeInfo;
-
-		if (!(typeInfo instanceof RowTypeInfo)) {
-			throw new IllegalArgumentException("Row type information expected.");
-		}
+		this(typeInfo, false);
 	}
 
 	/**
-	 * Creates a JSON deserialization schema for the given JSON schema.
-	 *
-	 * @param jsonSchema JSON schema describing the result type
-	 *
-	 * @see <a href="http://json-schema.org/">http://json-schema.org/</a>
+	 * @deprecated Use the provided {@link Builder} instead.
 	 */
+	@Deprecated
 	public JsonRowDeserializationSchema(String jsonSchema) {
-		this(JsonRowSchemaConverter.convert(jsonSchema));
+		this(JsonRowSchemaConverter.convert(checkNotNull(jsonSchema)), false);
+	}
+
+	/**
+	 * @deprecated Use the provided {@link Builder} instead.
+	 */
+	@Deprecated
+	public void setFailOnMissingField(boolean failOnMissingField) {
+		// TODO make this class immutable once we drop this method
+		this.failOnMissingField = failOnMissingField;
+		this.runtimeConverter = createConverter(this.typeInfo);
 	}
 
 	@Override
 	public Row deserialize(byte[] message) throws IOException {
 		try {
 			final JsonNode root = objectMapper.readTree(message);
-			return convertRow(root, (RowTypeInfo) typeInfo);
+			return (Row) runtimeConverter.convert(objectMapper, root);
 		} catch (Throwable t) {
 			throw new IOException("Failed to deserialize JSON object.", t);
 		}
@@ -109,14 +141,48 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 	}
 
 	/**
-	 * Configures the failure behaviour if a JSON field is missing.
-	 *
-	 * <p>By default, a missing field is ignored and the field is set to null.
-	 *
-	 * @param failOnMissingField Flag indicating whether to fail or not on a missing field.
+	 * Builder for {@link JsonRowDeserializationSchema}.
 	 */
-	public void setFailOnMissingField(boolean failOnMissingField) {
-		this.failOnMissingField = failOnMissingField;
+	public static class Builder {
+
+		private final RowTypeInfo typeInfo;
+		private boolean failOnMissingField = false;
+
+		/**
+		 * Creates a JSON deserialization schema for the given type information.
+		 *
+		 * @param typeInfo Type information describing the result type. The field names of {@link Row}
+		 *                 are used to parse the JSON properties.
+		 */
+		public Builder(TypeInformation<Row> typeInfo) {
+			checkArgument(typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
+			this.typeInfo = (RowTypeInfo) typeInfo;
+		}
+
+		/**
+		 * Creates a JSON deserialization schema for the given JSON schema.
+		 *
+		 * @param jsonSchema JSON schema describing the result type
+		 *
+		 * @see <a href="http://json-schema.org/">http://json-schema.org/</a>
+		 */
+		public Builder(String jsonSchema) {
+			this(JsonRowSchemaConverter.convert(checkNotNull(jsonSchema)));
+		}
+
+		/**
+		 * Configures schema to fail if a JSON field is missing.
+		 *
+		 * <p>By default, a missing field is ignored and the field is set to null.
+		 */
+		public Builder failOnMissingField() {
+			this.failOnMissingField = true;
+			return this;
+		}
+
+		public JsonRowDeserializationSchema build() {
+			return new JsonRowDeserializationSchema(typeInfo, failOnMissingField);
+		}
 	}
 
 	@Override
@@ -128,7 +194,8 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 			return false;
 		}
 		final JsonRowDeserializationSchema that = (JsonRowDeserializationSchema) o;
-		return failOnMissingField == that.failOnMissingField && Objects.equals(typeInfo, that.typeInfo);
+		return Objects.equals(typeInfo, that.typeInfo) &&
+			Objects.equals(failOnMissingField, that.failOnMissingField);
 	}
 
 	@Override
@@ -136,99 +203,214 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 		return Objects.hash(typeInfo, failOnMissingField);
 	}
 
-	// --------------------------------------------------------------------------------------------
-
-	private Object convert(JsonNode node, TypeInformation<?> info) {
-		if (info == Types.VOID || node.isNull()) {
-			return null;
-		} else if (info == Types.BOOLEAN) {
-			return node.asBoolean();
-		} else if (info == Types.STRING) {
-			return node.asText();
-		} else if (info == Types.BIG_DEC) {
-			return node.decimalValue();
-		} else if (info == Types.BIG_INT) {
-			return node.bigIntegerValue();
-		} else if (info == Types.SQL_DATE) {
-			return Date.valueOf(node.asText());
-		} else if (info == Types.SQL_TIME) {
-			// according to RFC 3339 every full-time must have a timezone;
-			// until we have full timezone support, we only support UTC;
-			// users can parse their time as string as a workaround
-			final String time = node.asText();
-			if (time.indexOf('Z') < 0 || time.indexOf('.') >= 0) {
-				throw new IllegalStateException(
-					"Invalid time format. Only a time in UTC timezone without milliseconds is supported yet. " +
-						"Format: HH:mm:ss'Z'");
+	/*
+		Runtime converter
+	 */
+
+	/**
+	 * Runtime converter that maps between {@link JsonNode}s and Java objects.
+	 */
+	@FunctionalInterface
+	private interface DeserializationRuntimeConverter extends Serializable {
+		Object convert(ObjectMapper mapper, JsonNode jsonNode);
+	}
+
+	private DeserializationRuntimeConverter createConverter(TypeInformation<?> typeInfo) {
+		DeserializationRuntimeConverter baseConverter = createConverterForSimpleType(typeInfo)
+			.orElseGet(() ->
+				createContainerConverter(typeInfo)
+					.orElseGet(() -> createFallbackConverter(typeInfo.getTypeClass())));
+		return wrapIntoNullableConverter(baseConverter);
+	}
+
+	private DeserializationRuntimeConverter wrapIntoNullableConverter(DeserializationRuntimeConverter converter) {
+		return (mapper, jsonNode) -> {
+			if (jsonNode.isNull()) {
+				return null;
 			}
-			return Time.valueOf(time.substring(0, time.length() - 1));
-		} else if (info == Types.SQL_TIMESTAMP) {
+
+			return converter.convert(mapper, jsonNode);
+		};
+	}
+
+	private Optional<DeserializationRuntimeConverter> createContainerConverter(TypeInformation<?> typeInfo) {
+		if (typeInfo instanceof RowTypeInfo) {
+			return Optional.of(createRowConverter((RowTypeInfo) typeInfo));
+		} else if (typeInfo instanceof ObjectArrayTypeInfo) {
+			return Optional.of(createObjectArrayConverter(((ObjectArrayTypeInfo) typeInfo).getComponentInfo()));
+		} else if (typeInfo instanceof BasicArrayTypeInfo) {
+			return Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo()));
+		} else if (isPrimitiveByteArray(typeInfo)) {
+			return Optional.of(createByteArrayConverter());
+		} else {
+			return Optional.empty();
+		}
+	}
+
+	private DeserializationRuntimeConverter createByteArrayConverter() {
+		return (mapper, jsonNode) -> {
+			try {
+				return jsonNode.binaryValue();
+			} catch (IOException e) {
+				throw new WrappingRuntimeException("Unable to deserialize byte array.", e);
+			}
+		};
+	}
+
+	private boolean isPrimitiveByteArray(TypeInformation<?> typeInfo) {
+		return typeInfo instanceof PrimitiveArrayTypeInfo &&
+			((PrimitiveArrayTypeInfo) typeInfo).getComponentType() == Types.BYTE;
+	}
+
+	private DeserializationRuntimeConverter createObjectArrayConverter(TypeInformation elementTypeInfo) {
+		DeserializationRuntimeConverter elementConverter = createConverter(elementTypeInfo);
+		return assembleArrayConverter(elementConverter);
+	}
+
+	private DeserializationRuntimeConverter createRowConverter(RowTypeInfo typeInfo) {
+		List<DeserializationRuntimeConverter> fieldConverters = Arrays.stream(typeInfo.getFieldTypes())
+			.map(this::createConverter)
+			.collect(Collectors.toList());
+
+		return assembleRowConverter(typeInfo.getFieldNames(), fieldConverters);
+	}
+
+	private DeserializationRuntimeConverter createFallbackConverter(Class<?> valueType) {
+		return (mapper, jsonNode) -> {
+			// for types that were specified without JSON schema
+			// e.g. POJOs
+			try {
+				return mapper.treeToValue(jsonNode, valueType);
+			} catch (JsonProcessingException e) {
+				throw new WrappingRuntimeException(format("Could not convert node: %s", jsonNode), e);
+			}
+		};
+	}
+
+	private Optional<DeserializationRuntimeConverter> createConverterForSimpleType(TypeInformation<?> simpleTypeInfo) {
+		if (simpleTypeInfo == Types.VOID) {
+			return Optional.of((mapper, jsonNode) -> null);
+		} else if (simpleTypeInfo == Types.BOOLEAN) {
+			return Optional.of((mapper, jsonNode) -> jsonNode.asBoolean());
+		} else if (simpleTypeInfo == Types.STRING) {
+			return Optional.of((mapper, jsonNode) -> jsonNode.asText());
+		} else if (simpleTypeInfo == Types.INT) {
+			return Optional.of((mapper, jsonNode) -> jsonNode.asInt());
+		} else if (simpleTypeInfo == Types.LONG) {
+			return Optional.of((mapper, jsonNode) -> jsonNode.asLong());
+		} else if (simpleTypeInfo == Types.DOUBLE) {
+			return Optional.of((mapper, jsonNode) -> jsonNode.asDouble());
+		} else if (simpleTypeInfo == Types.FLOAT) {
+			return Optional.of((mapper, jsonNode) -> Float.parseFloat(jsonNode.asText().trim()));
+		} else if (simpleTypeInfo == Types.SHORT) {
+			return Optional.of((mapper, jsonNode) -> Short.parseShort(jsonNode.asText().trim()));
+		} else if (simpleTypeInfo == Types.BYTE) {
+			return Optional.of((mapper, jsonNode) -> Byte.parseByte(jsonNode.asText().trim()));
+		} else if (simpleTypeInfo == Types.BIG_DEC) {
+			return Optional.of((mapper, jsonNode) -> jsonNode.decimalValue());
+		} else if (simpleTypeInfo == Types.BIG_INT) {
+			return Optional.of((mapper, jsonNode) -> jsonNode.bigIntegerValue());
+		} else if (simpleTypeInfo == Types.SQL_DATE) {
+			return Optional.of(createDateConverter());
+		} else if (simpleTypeInfo == Types.SQL_TIME) {
+			return Optional.of(createTimeConverter());
+		} else if (simpleTypeInfo == Types.SQL_TIMESTAMP) {
+			return Optional.of(createTimestampConverter());
+		} else {
+			return Optional.empty();
+		}
+	}
+
+	private DeserializationRuntimeConverter createDateConverter() {
+		return (mapper, jsonNode) -> Date.valueOf(ISO_LOCAL_DATE.parse(jsonNode.asText())
+			.query(TemporalQueries.localDate()));
+	}
+
+	private DeserializationRuntimeConverter createTimestampConverter() {
+		return (mapper, jsonNode) -> {
 			// according to RFC 3339 every date-time must have a timezone;
 			// until we have full timezone support, we only support UTC;
 			// users can parse their time as string as a workaround
-			final String timestamp = node.asText();
-			if (timestamp.indexOf('Z') < 0) {
+			TemporalAccessor parsedTimestamp = RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+
+			ZoneOffset zoneOffset = parsedTimestamp.query(TemporalQueries.offset());
+
+			if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0) {
 				throw new IllegalStateException(
 					"Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " +
 						"Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
 			}
-			return Timestamp.valueOf(timestamp.substring(0, timestamp.length() - 1).replace('T', ' '));
-		} else if (info instanceof RowTypeInfo) {
-			return convertRow(node, (RowTypeInfo) info);
-		} else if (info instanceof ObjectArrayTypeInfo) {
-			return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo());
-		} else if (info instanceof BasicArrayTypeInfo) {
-			return convertObjectArray(node, ((BasicArrayTypeInfo) info).getComponentInfo());
-		} else if (info instanceof PrimitiveArrayTypeInfo &&
-				((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
-			return convertByteArray(node);
-		} else {
-			// for types that were specified without JSON schema
-			// e.g. POJOs
-			try {
-				return objectMapper.treeToValue(node, info.getTypeClass());
-			} catch (JsonProcessingException e) {
-				throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
+
+			LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
+			LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
+
+			return Timestamp.valueOf(LocalDateTime.of(localDate, localTime));
+		};
+	}
+
+	private DeserializationRuntimeConverter createTimeConverter() {
+		return (mapper, jsonNode) -> {
+
+			// according to RFC 3339 every full-time must have a timezone;
+			// until we have full timezone support, we only support UTC;
+			// users can parse their time as string as a workaround
+			TemporalAccessor parsedTime = RFC3339_TIME_FORMAT.parse(jsonNode.asText());
+
+			ZoneOffset zoneOffset = parsedTime.query(TemporalQueries.offset());
+			LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
+
+			if (zoneOffset != null && zoneOffset.getTotalSeconds() != 0 || localTime.getNano() != 0) {
+				throw new IllegalStateException(
+					"Invalid time format. Only a time in UTC timezone without milliseconds is supported yet.");
 			}
-		}
+
+			return Time.valueOf(localTime);
+		};
 	}
 
-	private Row convertRow(JsonNode node, RowTypeInfo info) {
-		final String[] names = info.getFieldNames();
-		final TypeInformation<?>[] types = info.getFieldTypes();
-
-		final Row row = new Row(names.length);
-		for (int i = 0; i < names.length; i++) {
-			final String name = names[i];
-			final JsonNode subNode = node.get(name);
-			if (subNode == null) {
-				if (failOnMissingField) {
-					throw new IllegalStateException(
-						"Could not find field with name '" + name + "'.");
-				} else {
-					row.setField(i, null);
-				}
-			} else {
-				row.setField(i, convert(subNode, types[i]));
+	private DeserializationRuntimeConverter assembleRowConverter(
+		String[] fieldNames,
+		List<DeserializationRuntimeConverter> fieldConverters) {
+		return (mapper, jsonNode) -> {
+			ObjectNode node = (ObjectNode) jsonNode;
+
+			int arity = fieldNames.length;
+			Row row = new Row(arity);
+			for (int i = 0; i < arity; i++) {
+				String fieldName = fieldNames[i];
+				JsonNode field = node.get(fieldName);
+				Object convertField = convertField(mapper, fieldConverters.get(i), fieldName, field);
+				row.setField(i, convertField);
 			}
-		}
 
-		return row;
+			return row;
+		};
 	}
 
-	private Object convertObjectArray(JsonNode node, TypeInformation<?> elementType) {
-		final Object[] array = (Object[]) Array.newInstance(elementType.getTypeClass(), node.size());
-		for (int i = 0; i < node.size(); i++) {
-			array[i] = convert(node.get(i), elementType);
+	private Object convertField(
+		ObjectMapper mapper,
+		DeserializationRuntimeConverter fieldConverter,
+		String fieldName,
+		JsonNode field) {
+		if (field == null) {
+			if (failOnMissingField) {
+				throw new IllegalStateException(
+					"Could not find field with name '" + fieldName + "'.");
+			} else {
+				return null;
+			}
+		} else {
+			return fieldConverter.convert(mapper, field);
 		}
-		return array;
 	}
 
-	private Object convertByteArray(JsonNode node) {
-		try {
-			return node.binaryValue();
-		} catch (IOException e) {
-			throw new RuntimeException("Unable to deserialize byte array.", e);
-		}
+	private DeserializationRuntimeConverter assembleArrayConverter(DeserializationRuntimeConverter elementConverter) {
+		return (mapper, jsonNode) -> {
+			ArrayNode node = (ArrayNode) jsonNode;
+
+			return stream(spliterator(node.elements(), node.size(), 0), false)
+				.map(innerNode -> elementConverter.convert(mapper, innerNode))
+				.toArray();
+		};
 	}
 }
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
index 567bef3..af758b6 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
@@ -58,12 +58,17 @@ public class JsonRowFormatFactory extends TableFormatFactoryBase<Row>
 		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
 
 		// create and configure
-		final JsonRowDeserializationSchema schema = new JsonRowDeserializationSchema(createTypeInformation(descriptorProperties));
+		final JsonRowDeserializationSchema.Builder schema =
+			new JsonRowDeserializationSchema.Builder(createTypeInformation(descriptorProperties));
 
 		descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
-				.ifPresent(schema::setFailOnMissingField);
+			.ifPresent(flag -> {
+				if (flag) {
+					schema.failOnMissingField();
+				}
+			});
 
-		return schema;
+		return schema.build();
 	}
 
 	@Override
@@ -71,7 +76,7 @@ public class JsonRowFormatFactory extends TableFormatFactoryBase<Row>
 		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
 
 		// create and configure
-		return new JsonRowSerializationSchema(createTypeInformation(descriptorProperties));
+		return new JsonRowSerializationSchema.Builder(createTypeInformation(descriptorProperties)).build();
 	}
 
 	private TypeInformation<Row> createTypeInformation(DescriptorProperties descriptorProperties) {
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index d942062..0aa7151 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -27,19 +27,32 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.WrappingRuntimeException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
+import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Serialization schema that serializes an object of Flink types into a JSON bytes.
@@ -55,42 +68,61 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 	private static final long serialVersionUID = -2885556750743978636L;
 
 	/** Type information describing the input type. */
-	private final TypeInformation<Row> typeInfo;
+	private final RowTypeInfo typeInfo;
 
 	/** Object mapper that is used to create output JSON objects. */
 	private final ObjectMapper mapper = new ObjectMapper();
 
-	/** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone, without milliseconds). */
-	private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss'Z'");
-
-	/** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone). */
-	private SimpleDateFormat timeFormatWithMillis = new SimpleDateFormat("HH:mm:ss.SSS'Z'");
-
-	/** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */
-	private SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+	private final SerializationRuntimeConverter runtimeConverter;
 
 	/** Reusable object node. */
 	private transient ObjectNode node;
 
 	/**
-	 * Creates a JSON serialization schema for the given type information.
-	 *
-	 * @param typeInfo The field names of {@link Row} are used to map to JSON properties.
+	 * @deprecated Use the provided {@link Builder} instead.
 	 */
+	@Deprecated
 	public JsonRowSerializationSchema(TypeInformation<Row> typeInfo) {
+		// TODO make this constructor private in the future
 		Preconditions.checkNotNull(typeInfo, "Type information");
-		this.typeInfo = typeInfo;
+		Preconditions.checkArgument(typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
+		this.typeInfo = (RowTypeInfo) typeInfo;
+		this.runtimeConverter = createConverter(typeInfo);
 	}
 
 	/**
-	 * Creates a JSON serialization schema for the given JSON schema.
-	 *
-	 * @param jsonSchema JSON schema describing the result type
-	 *
-	 * @see <a href="http://json-schema.org/">http://json-schema.org/</a>
+	 * Builder for {@link JsonRowSerializationSchema}.
 	 */
-	public JsonRowSerializationSchema(String jsonSchema) {
-		this(JsonRowSchemaConverter.convert(jsonSchema));
+	@PublicEvolving
+	public static class Builder {
+
+		private final RowTypeInfo typeInfo;
+
+		/**
+		 * Creates a JSON serialization schema for the given type information.
+		 *
+		 * @param typeInfo Type information describing the result type. The field names of {@link Row}
+		 *                 are used to parse the JSON properties.
+		 */
+		public Builder(TypeInformation<Row> typeInfo) {
+			checkArgument(typeInfo instanceof RowTypeInfo, "Only RowTypeInfo is supported");
+			this.typeInfo = (RowTypeInfo) typeInfo;
+		}
+
+		/**
+		 * Creates a JSON serialization schema for the given JSON schema.
+		 *
+		 * @param jsonSchema JSON schema describing the result type
+		 *
+		 * @see <a href="http://json-schema.org/">http://json-schema.org/</a>
+		 */
+		public Builder(String jsonSchema) {
+			this(JsonRowSchemaConverter.convert(checkNotNull(jsonSchema)));
+		}
+
+		public JsonRowSerializationSchema build() {
+			return new JsonRowSerializationSchema(typeInfo);
+		}
 	}
 
 	@Override
@@ -100,7 +132,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 		}
 
 		try {
-			convertRow(node, (RowTypeInfo) typeInfo, row);
+			runtimeConverter.convert(mapper, node, row);
 			return mapper.writeValueAsBytes(node);
 		} catch (Throwable t) {
 			throw new RuntimeException("Could not serialize row '" + row + "'. " +
@@ -125,102 +157,204 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 		return Objects.hash(typeInfo);
 	}
 
-	// --------------------------------------------------------------------------------------------
+	/*
+		Runtime converters
+	 */
 
-	private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) {
-		if (reuse == null) {
-			reuse = mapper.createObjectNode();
-		}
-		final String[] fieldNames = info.getFieldNames();
-		final TypeInformation<?>[] fieldTypes = info.getFieldTypes();
+	/**
+	 * Runtime converter that maps between Java objects and corresponding {@link JsonNode}s.
+	 */
+	@FunctionalInterface
+	private interface SerializationRuntimeConverter extends Serializable {
+		JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object object);
+	}
+
+	private SerializationRuntimeConverter createConverter(TypeInformation<?> typeInfo) {
+		SerializationRuntimeConverter baseConverter = createConverterForSimpleType(typeInfo)
+			.orElseGet(() ->
+				createContainerConverter(typeInfo)
+					.orElseGet(this::createFallbackConverter));
+		return wrapIntoNullableConverter(baseConverter);
+	}
+
+	private SerializationRuntimeConverter wrapIntoNullableConverter(SerializationRuntimeConverter converter) {
+		return (mapper, reuse, object) -> {
+			if (object == null) {
+				return mapper.getNodeFactory().nullNode();
+			}
 
-		// validate the row
-		if (row.getArity() != fieldNames.length) {
-			throw new IllegalStateException(String.format(
-				"Number of elements in the row '%s' is different from number of field names: %d", row, fieldNames.length));
+			return converter.convert(mapper, reuse, object);
+		};
+	}
+
+	private Optional<SerializationRuntimeConverter> createContainerConverter(TypeInformation<?> typeInfo) {
+		if (typeInfo instanceof RowTypeInfo) {
+			return Optional.of(createRowConverter((RowTypeInfo) typeInfo));
+		} else if (typeInfo instanceof ObjectArrayTypeInfo) {
+			return Optional.of(createObjectArrayConverter(((ObjectArrayTypeInfo) typeInfo).getComponentInfo()));
+		} else if (typeInfo instanceof BasicArrayTypeInfo) {
+			return Optional.of(createObjectArrayConverter(((BasicArrayTypeInfo) typeInfo).getComponentInfo()));
+		} else if (isPrimitiveByteArray(typeInfo)) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().binaryNode((byte[]) object));
+		} else {
+			return Optional.empty();
 		}
+	}
+
+	private boolean isPrimitiveByteArray(TypeInformation<?> typeInfo) {
+		return typeInfo instanceof PrimitiveArrayTypeInfo &&
+			((PrimitiveArrayTypeInfo) typeInfo).getComponentType() == Types.BYTE;
+	}
+
+	private SerializationRuntimeConverter createObjectArrayConverter(TypeInformation elementTypeInfo) {
+		SerializationRuntimeConverter elementConverter = createConverter(elementTypeInfo);
+		return assembleArrayConverter(elementConverter);
+	}
 
-		for (int i = 0; i < fieldNames.length; i++) {
-			final String name = fieldNames[i];
+	private SerializationRuntimeConverter createRowConverter(RowTypeInfo typeInfo) {
+		List<SerializationRuntimeConverter> fieldConverters = Arrays.stream(typeInfo.getFieldTypes())
+			.map(this::createConverter)
+			.collect(Collectors.toList());
+
+		return assembleRowConverter(typeInfo.getFieldNames(), fieldConverters);
+	}
 
-			final JsonNode fieldConverted = convert(reuse, reuse.get(name), fieldTypes[i], row.getField(i));
-			reuse.set(name, fieldConverted);
+	private SerializationRuntimeConverter createFallbackConverter() {
+		return (mapper, reuse, object) -> {
+			// for types that were specified without JSON schema
+			// e.g. POJOs
+			try {
+				return mapper.valueToTree(object);
+			} catch (IllegalArgumentException e) {
+				throw new WrappingRuntimeException(format("Could not convert object: %s", object), e);
+			}
+		};
+	}
+
+	private Optional<SerializationRuntimeConverter> createConverterForSimpleType(TypeInformation<?> simpleTypeInfo) {
+		if (simpleTypeInfo == Types.VOID) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().nullNode());
+		} else if (simpleTypeInfo == Types.BOOLEAN) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().booleanNode((Boolean) object));
+		} else if (simpleTypeInfo == Types.STRING) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().textNode((String) object));
+		} else if (simpleTypeInfo == Types.INT) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Integer) object));
+		} else if (simpleTypeInfo == Types.LONG) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Long) object));
+		} else if (simpleTypeInfo == Types.DOUBLE) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Double) object));
+		} else if (simpleTypeInfo == Types.FLOAT) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Float) object));
+		} else if (simpleTypeInfo == Types.SHORT) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Short) object));
+		} else if (simpleTypeInfo == Types.BYTE) {
+			return Optional.of((mapper, reuse, object) -> mapper.getNodeFactory().numberNode((Byte) object));
+		} else if (simpleTypeInfo == Types.BIG_DEC) {
+			return Optional.of(createBigDecimalConverter());
+		} else if (simpleTypeInfo == Types.BIG_INT) {
+			return Optional.of(createBigIntegerConverter());
+		} else if (simpleTypeInfo == Types.SQL_DATE) {
+			return Optional.of(createDateConverter());
+		} else if (simpleTypeInfo == Types.SQL_TIME) {
+			return Optional.of(createTimeConverter());
+		} else if (simpleTypeInfo == Types.SQL_TIMESTAMP) {
+			return Optional.of(createTimestampConverter());
+		} else {
+			return Optional.empty();
 		}
+	}
+
+	private SerializationRuntimeConverter createDateConverter() {
+		return (mapper, reuse, object) -> {
+			Date date = (Date) object;
+
+			return mapper.getNodeFactory().textNode(ISO_LOCAL_DATE.format(date.toLocalDate()));
+		};
+	}
+
+	private SerializationRuntimeConverter createTimestampConverter() {
+		return (mapper, reuse, object) -> {
+			Timestamp timestamp = (Timestamp) object;
 
-		return reuse;
+			return mapper.getNodeFactory()
+				.textNode(RFC3339_TIMESTAMP_FORMAT.format(timestamp.toLocalDateTime()));
+		};
 	}
 
-	private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) {
-		if (info == Types.VOID || object == null) {
-			return container.nullNode();
-		} else if (info == Types.BOOLEAN) {
-			return container.booleanNode((Boolean) object);
-		} else if (info == Types.STRING) {
-			return container.textNode((String) object);
-		} else if (info == Types.BIG_DEC) {
+	private SerializationRuntimeConverter createTimeConverter() {
+		return (mapper, reuse, object) -> {
+			final Time time = (Time) object;
+
+			JsonNodeFactory nodeFactory = mapper.getNodeFactory();
+			return nodeFactory.textNode(RFC3339_TIME_FORMAT.format(time.toLocalTime()));
+		};
+	}
+
+	private SerializationRuntimeConverter createBigDecimalConverter() {
+		return (mapper, reuse, object) -> {
 			// convert decimal if necessary
+			JsonNodeFactory nodeFactory = mapper.getNodeFactory();
 			if (object instanceof BigDecimal) {
-				return container.numberNode((BigDecimal) object);
+				return nodeFactory.numberNode((BigDecimal) object);
 			}
-			return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
-		} else if (info == Types.BIG_INT) {
-			// convert integer if necessary
+			return nodeFactory.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
+		};
+	}
+
+	private SerializationRuntimeConverter createBigIntegerConverter() {
+		return (mapper, reuse, object) -> {
+			// convert decimal if necessary
+			JsonNodeFactory nodeFactory = mapper.getNodeFactory();
 			if (object instanceof BigInteger) {
-				return container.numberNode((BigInteger) object);
+				return nodeFactory.numberNode((BigInteger) object);
 			}
-			return container.numberNode(BigInteger.valueOf(((Number) object).longValue()));
-		} else if (info == Types.SQL_DATE) {
-			return container.textNode(object.toString());
-		} else if (info == Types.SQL_TIME) {
-			final Time time = (Time) object;
-			// strip milliseconds if possible
-			if (time.getTime() % 1000 > 0) {
-				return container.textNode(timeFormatWithMillis.format(time));
-			}
-			return container.textNode(timeFormat.format(time));
-		} else if (info == Types.SQL_TIMESTAMP) {
-			return container.textNode(timestampFormat.format((Timestamp) object));
-		} else if (info instanceof RowTypeInfo) {
-			if (reuse != null && reuse instanceof ObjectNode) {
-				return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object);
+			return nodeFactory.numberNode(BigInteger.valueOf(((Number) object).longValue()));
+		};
+	}
+
+	private SerializationRuntimeConverter assembleRowConverter(
+			String[] fieldNames,
+			List<SerializationRuntimeConverter> fieldConverters) {
+		return (mapper, reuse, object) -> {
+			ObjectNode node;
+
+			if (reuse == null) {
+				node = mapper.createObjectNode();
 			} else {
-				return convertRow(null, (RowTypeInfo) info, (Row) object);
+				node = (ObjectNode) reuse;
 			}
-		} else if (info instanceof ObjectArrayTypeInfo) {
-			if (reuse != null && reuse instanceof ArrayNode) {
-				return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
-			} else {
-				return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
+
+			Row row = (Row) object;
+
+			for (int i = 0; i < fieldNames.length; i++) {
+				String fieldName = fieldNames[i];
+				node.set(fieldName,
+					fieldConverters.get(i).convert(mapper, node.get(fieldNames[i]), row.getField(i)));
 			}
-		} else if (info instanceof BasicArrayTypeInfo) {
-			if (reuse != null && reuse instanceof ArrayNode) {
-				return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
+
+			return node;
+		};
+	}
+
+	private SerializationRuntimeConverter assembleArrayConverter(SerializationRuntimeConverter elementConverter) {
+		return (mapper, reuse, object) -> {
+			ArrayNode node;
+
+			if (reuse == null) {
+				node = mapper.createArrayNode();
 			} else {
-				return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
+				node = (ArrayNode) reuse;
+				node.removeAll();
 			}
-		} else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
-			return container.binaryNode((byte[]) object);
-		} else {
-			// for types that were specified without JSON schema
-			// e.g. POJOs
-			try {
-				return mapper.valueToTree(object);
-			} catch (IllegalArgumentException e) {
-				throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e);
-			}
-		}
-	}
 
-	private ArrayNode convertObjectArray(ArrayNode reuse, TypeInformation<?> info, Object[] array) {
-		if (reuse == null) {
-			reuse = mapper.createArrayNode();
-		} else {
-			reuse.removeAll();
-		}
+			Object[] array = (Object[]) object;
 
-		for (Object object : array) {
-			reuse.add(convert(reuse, null, info, object));
-		}
-		return reuse;
+			for (Object element : array) {
+				node.add(elementConverter.convert(mapper, null, element));
+			}
+
+			return node;
+		};
 	}
 }
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
new file mode 100644
index 0000000..c946c5d
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+
+/**
+ * Time formats respecting the RFC3339 specification.
+ */
+class TimeFormats {
+
+	/** Formatter for RFC 3339-compliant string representation of a time value. */
+	static final DateTimeFormatter RFC3339_TIME_FORMAT = new DateTimeFormatterBuilder()
+		.appendPattern("HH:mm:ss")
+		.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+		.appendPattern("'Z'")
+		.toFormatter();
+
+	/** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */
+	static final DateTimeFormatter RFC3339_TIMESTAMP_FORMAT = new DateTimeFormatterBuilder()
+		.append(DateTimeFormatter.ISO_LOCAL_DATE)
+		.appendLiteral('T')
+		.append(RFC3339_TIME_FORMAT)
+		.toFormatter();
+
+	private TimeFormats() {
+	}
+}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
index 5e77b80..0573019 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.json;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
 
@@ -27,15 +28,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.Obje
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.apache.flink.formats.utils.DeserializationSchemaMatcher.whenDeserializedWith;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.internal.matchers.ThrowableCauseMatcher.hasCause;
 
 /**
  * Tests for the {@link JsonRowDeserializationSchema}.
@@ -62,18 +64,18 @@ public class JsonRowDeserializationSchemaTest {
 
 		byte[] serializedJson = objectMapper.writeValueAsBytes(root);
 
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
+		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(
 			Types.ROW_NAMED(
-				new String[] { "id", "name", "bytes" },
+				new String[]{"id", "name", "bytes"},
 				Types.LONG, Types.STRING, Types.PRIMITIVE_ARRAY(Types.BYTE))
-		);
+		).build();
 
-		Row deserialized = deserializationSchema.deserialize(serializedJson);
+		Row row = new Row(3);
+		row.setField(0, id);
+		row.setField(1, name);
+		row.setField(2, bytes);
 
-		assertEquals(3, deserialized.getArity());
-		assertEquals(id, deserialized.getField(0));
-		assertEquals(name, deserialized.getField(1));
-		assertArrayEquals(bytes, (byte[]) deserialized.getField(2));
+		assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(row));
 	}
 
 	@Test
@@ -103,7 +105,7 @@ public class JsonRowDeserializationSchemaTest {
 
 		final byte[] serializedJson = objectMapper.writeValueAsBytes(root);
 
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
+		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(
 			"{" +
 			"    type: 'object'," +
 			"    properties: {" +
@@ -124,9 +126,7 @@ public class JsonRowDeserializationSchemaTest {
 			"             }" +
 			"         }" +
 			"    }" +
-			"}");
-
-		final Row deserialized = deserializationSchema.deserialize(serializedJson);
+			"}").build();
 
 		final Row expected = new Row(10);
 		expected.setField(0, id);
@@ -143,7 +143,7 @@ public class JsonRowDeserializationSchemaTest {
 		nestedRow.setField(1, BigDecimal.valueOf(12));
 		expected.setField(9, nestedRow);
 
-		assertEquals(expected, deserialized);
+		assertThat(serializedJson, whenDeserializedWith(deserializationSchema).equalsTo(expected));
 	}
 
 	/**
@@ -158,25 +158,25 @@ public class JsonRowDeserializationSchemaTest {
 		root.put("id", 123123123);
 		byte[] serializedJson = objectMapper.writeValueAsBytes(root);
 
-		JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(
-			Types.ROW_NAMED(
-				new String[] { "name" },
-				Types.STRING)
-		);
+		TypeInformation<Row> rowTypeInformation = Types.ROW_NAMED(
+			new String[]{"name"},
+			Types.STRING);
 
-		Row row = deserializationSchema.deserialize(serializedJson);
+		JsonRowDeserializationSchema deserializationSchema =
+			new JsonRowDeserializationSchema.Builder(rowTypeInformation)
+				.build();
 
-		assertEquals(1, row.getArity());
-		Assert.assertNull("Missing field not null", row.getField(0));
+		Row row = new Row(1);
+		assertThat(serializedJson,
+			whenDeserializedWith(deserializationSchema).equalsTo(row));
 
-		deserializationSchema.setFailOnMissingField(true);
+		deserializationSchema = new JsonRowDeserializationSchema.Builder(rowTypeInformation)
+			.failOnMissingField()
+			.build();
 
-		try {
-			deserializationSchema.deserialize(serializedJson);
-			Assert.fail("Did not throw expected Exception");
-		} catch (IOException e) {
-			Assert.assertTrue(e.getCause() instanceof IllegalStateException);
-		}
+		assertThat(serializedJson,
+			whenDeserializedWith(deserializationSchema)
+				.failsWithException(hasCause(instanceOf(IllegalStateException.class))));
 	}
 
 	/**
@@ -185,10 +185,10 @@ public class JsonRowDeserializationSchemaTest {
 	@Test
 	public void testNumberOfFieldNamesAndTypesMismatch() {
 		try {
-			new JsonRowDeserializationSchema(
+			new JsonRowDeserializationSchema.Builder(
 				Types.ROW_NAMED(
 					new String[]{"one", "two", "three"},
-					Types.LONG));
+					Types.LONG)).build();
 			Assert.fail("Did not throw expected Exception");
 		} catch (IllegalArgumentException ignored) {
 			// Expected
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
index caf99f4..47b06c4 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
@@ -110,8 +110,7 @@ public class JsonRowFormatFactoryTest extends TestLogger {
 		final DeserializationSchema<?> actual2 = TableFactoryService
 			.find(DeserializationSchemaFactory.class, properties)
 			.createDeserializationSchema(properties);
-		final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema(SCHEMA);
-		expected2.setFailOnMissingField(false);
+		final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema.Builder(SCHEMA).build();
 		assertEquals(expected2, actual2);
 	}
 
@@ -119,7 +118,7 @@ public class JsonRowFormatFactoryTest extends TestLogger {
 		final SerializationSchema<?> actual1 = TableFactoryService
 			.find(SerializationSchemaFactory.class, properties)
 			.createSerializationSchema(properties);
-		final SerializationSchema<?> expected1 = new JsonRowSerializationSchema(SCHEMA);
+		final SerializationSchema<?> expected1 = new JsonRowSerializationSchema.Builder(SCHEMA).build();
 		assertEquals(expected1, actual1);
 	}
 
@@ -127,8 +126,9 @@ public class JsonRowFormatFactoryTest extends TestLogger {
 		final DeserializationSchema<?> actual2 = TableFactoryService
 			.find(DeserializationSchemaFactory.class, properties)
 			.createDeserializationSchema(properties);
-		final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema(JSON_SCHEMA);
-		expected2.setFailOnMissingField(true);
+		final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema.Builder(JSON_SCHEMA)
+			.failOnMissingField()
+			.build();
 		assertEquals(expected2, actual2);
 	}
 
@@ -136,7 +136,7 @@ public class JsonRowFormatFactoryTest extends TestLogger {
 		final SerializationSchema<?> actual1 = TableFactoryService
 			.find(SerializationSchemaFactory.class, properties)
 			.createSerializationSchema(properties);
-		final SerializationSchema<?> expected1 = new JsonRowSerializationSchema(JSON_SCHEMA);
+		final SerializationSchema<?> expected1 = new JsonRowSerializationSchema.Builder(JSON_SCHEMA).build();
 		assertEquals(expected1, actual1);
 	}
 
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
index e2410d4..cc1f5bf 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
@@ -30,7 +30,10 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.flink.formats.utils.SerializationSchemaMatcher.whenSerializedWith;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for the {@link JsonRowSerializationSchema}.
@@ -38,7 +41,7 @@ import static org.junit.Assert.assertEquals;
 public class JsonRowSerializationSchemaTest {
 
 	@Test
-	public void testRowSerialization() throws IOException {
+	public void testRowSerialization() {
 		final TypeInformation<Row> rowSchema = Types.ROW_NAMED(
 			new String[] {"f1", "f2", "f3"},
 			Types.INT, Types.BOOLEAN, Types.STRING);
@@ -48,8 +51,14 @@ public class JsonRowSerializationSchemaTest {
 		row.setField(1, true);
 		row.setField(2, "str");
 
-		final Row resultRow = serializeAndDeserialize(rowSchema, row);
-		assertEquals(row, resultRow);
+		final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema.Builder(rowSchema)
+			.build();
+		final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(rowSchema)
+			.build();
+
+		assertThat(row, whenSerializedWith(serializationSchema)
+			.andDeserializedWith(deserializationSchema)
+			.equalsTo(row));
 	}
 
 	@Test
@@ -63,8 +72,10 @@ public class JsonRowSerializationSchemaTest {
 		row1.setField(1, true);
 		row1.setField(2, "str");
 
-		final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
-		final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
+		final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema.Builder(rowSchema)
+			.build();
+		final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(rowSchema)
+			.build();
 
 		byte[] bytes = serializationSchema.serialize(row1);
 		assertEquals(row1, deserializationSchema.deserialize(bytes));
@@ -79,7 +90,7 @@ public class JsonRowSerializationSchemaTest {
 	}
 
 	@Test
-	public void testNestedSchema() throws IOException {
+	public void testNestedSchema() {
 		final TypeInformation<Row> rowSchema = Types.ROW_NAMED(
 			new String[] {"f1", "f2", "f3"},
 			Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.DOUBLE));
@@ -92,25 +103,32 @@ public class JsonRowSerializationSchemaTest {
 		nested.setField(1, 2.3);
 		row.setField(2, nested);
 
-		final Row resultRow = serializeAndDeserialize(rowSchema, row);
-		assertEquals(row, resultRow);
+		final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema.Builder(rowSchema)
+			.build();
+		final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(rowSchema)
+			.build();
+
+		assertThat(row, whenSerializedWith(serializationSchema)
+			.andDeserializedWith(deserializationSchema)
+			.equalsTo(row));
 	}
 
-	@Test(expected = RuntimeException.class)
+	@Test
 	public void testSerializeRowWithInvalidNumberOfFields() {
 		final TypeInformation<Row> rowSchema = Types.ROW_NAMED(
-			new String[] {"f1", "f2", "f3"},
+			new String[]{"f1", "f2", "f3"},
 			Types.INT, Types.BOOLEAN, Types.STRING);
 
 		final Row row = new Row(1);
 		row.setField(0, 1);
 
-		final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
-		serializationSchema.serialize(row);
+		final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema.Builder(rowSchema)
+			.build();
+		assertThat(row, whenSerializedWith(serializationSchema).failsWithException(instanceOf(RuntimeException.class)));
 	}
 
 	@Test
-	public void testSchema() throws IOException {
+	public void testSchema() {
 		final TypeInformation<Row> rowSchema = JsonRowSchemaConverter.convert(
 			"{" +
 			"    type: 'object'," +
@@ -157,17 +175,14 @@ public class JsonRowSerializationSchemaTest {
 		nestedRow.setField(1, BigDecimal.valueOf(12));
 		row.setField(10, nestedRow);
 
-		final Row resultRow = serializeAndDeserialize(rowSchema, row);
-		assertEquals(row, resultRow);
-	}
-
-	// --------------------------------------------------------------------------------------------
+		final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema.Builder(rowSchema)
+			.build();
+		final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema.Builder(rowSchema)
+			.build();
 
-	private Row serializeAndDeserialize(TypeInformation<Row> rowSchema, Row row) throws IOException {
-		final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
-		final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
-
-		final byte[] bytes = serializationSchema.serialize(row);
-		return deserializationSchema.deserialize(bytes);
+		assertThat(row, whenSerializedWith(serializationSchema)
+			.andDeserializedWith(deserializationSchema)
+			.equalsTo(row));
 	}
+
 }
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/DeserializationSchemaMatcher.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/DeserializationSchemaMatcher.java
new file mode 100644
index 0000000..c5b19e7
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/DeserializationSchemaMatcher.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.utils;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.types.Row;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.InstantiationUtil.deserializeObject;
+import static org.apache.flink.util.InstantiationUtil.serializeObject;
+
+/**
+ * Matcher that provides a common way for asserting results of {@link DeserializationSchema}. It takes into account
+ * e.g. the fact that serialization schema during runtime might be used after serializing it over a wire. Usage:
+ *
+ * <ul>
+ * <li>when asserting for result after deserializing a row
+ * <pre>{@code
+ *      byte[] jsonBytes = ...
+ *      Row expectedRow = ...
+ *      final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
+ *
+ *      assertThat(jsonBytes, whenDeserializedWith(deserializationSchema)
+ *          .matches(expectedRow));
+ * }</pre>
+ * </li>
+ *
+ * <li>to check if an exception is thrown during serialization:
+ * <pre>{@code
+ *      assertThat(serializedJson,
+ *          whenDeserializedWith(deserializationSchema)
+ *              .failsWithException(hasCause(instanceOf(IllegalStateException.class))));
+ * }</pre>
+ * </li>
+ * </ul>
+ */
+public abstract class DeserializationSchemaMatcher extends TypeSafeMatcher<byte[]> {
+
+	final DeserializationSchema<Row> deserializationSchema;
+
+	private DeserializationSchemaMatcher(DeserializationSchema<Row> deserializationSchema) {
+		this.deserializationSchema = deserializationSchema;
+	}
+
+	public static DeserializationSchemaMatcherBuilder whenDeserializedWith(DeserializationSchema<Row> deserializationSchema) {
+		return new DeserializationSchemaMatcherBuilder(deserializationSchema);
+	}
+
+	private static class DeserializationSchemaResultMatcher extends DeserializationSchemaMatcher {
+
+		private final Row expected;
+
+		private DeserializationSchemaResultMatcher(
+			DeserializationSchema<Row> deserializationSchema,
+			Row expected) {
+			super(deserializationSchema);
+
+			this.expected = expected;
+		}
+
+		@Override
+		protected boolean matchesSafely(byte[] item) {
+			try {
+				return Objects.deepEquals(deserializationSchema.deserialize(item), expected);
+			} catch (IOException e) {
+				throw new AssertionError("Could not deserialize", e);
+			}
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description.appendValue(expected);
+		}
+	}
+
+	private static class DeserializationSchemaExceptionMatcher extends DeserializationSchemaMatcher {
+
+		private final Matcher<? extends Throwable> exceptionMatcher;
+		private Throwable thrownException = null;
+
+		private DeserializationSchemaExceptionMatcher(
+			DeserializationSchema<Row> deserializationSchema,
+			Matcher<? extends Throwable> exceptionMatcher) {
+			super(deserializationSchema);
+			this.exceptionMatcher = exceptionMatcher;
+		}
+
+		@Override
+		protected boolean matchesSafely(byte[] item) {
+			try {
+				deserializationSchema.deserialize(item);
+			} catch (IOException e) {
+				thrownException = e;
+			}
+			return exceptionMatcher.matches(thrownException);
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			exceptionMatcher.describeTo(description);
+		}
+
+		@Override
+		protected void describeMismatchSafely(byte[] item, Description mismatchDescription) {
+			exceptionMatcher.describeMismatch(thrownException, mismatchDescription);
+		}
+	}
+
+	/**
+	 * Builder for {@link DeserializationSchemaMatcher}.
+	 */
+	public static class DeserializationSchemaMatcherBuilder {
+
+		private DeserializationSchema<Row> deserializationSchema;
+
+		private DeserializationSchemaMatcherBuilder(DeserializationSchema<Row> deserializationSchema) {
+			try {
+				// we serialize and deserialize the schema to test runtime behavior
+				// when the schema is shipped to the cluster
+				this.deserializationSchema = deserializeObject(
+					serializeObject(deserializationSchema),
+					this.getClass().getClassLoader());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		public DeserializationSchemaMatcher equalsTo(Row row) {
+			return new DeserializationSchemaResultMatcher(
+				deserializationSchema,
+				row
+			);
+		}
+
+		public DeserializationSchemaMatcher failsWithException(Matcher<? extends Throwable> exceptionMatcher) {
+			return new DeserializationSchemaExceptionMatcher(
+				deserializationSchema,
+				exceptionMatcher
+			);
+		}
+	}
+}
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/SerializationSchemaMatcher.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/SerializationSchemaMatcher.java
new file mode 100644
index 0000000..4cda5dd
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/utils/SerializationSchemaMatcher.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.utils;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.InstantiationUtil.deserializeObject;
+import static org.apache.flink.util.InstantiationUtil.serializeObject;
+
+/**
+ * Matcher that provides a common way for asserting results of {@link SerializationSchema}. It takes into account
+ * e.g. the fact that serialization schema during runtime might be used after serializing and deserializing it over
+ * a wire. Usage:
+ *
+ * <ul>
+ * <li>when asserting for result after serializing and deserializing a row
+ * <pre>{@code
+ *      final JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
+ *      final JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
+ *
+ *      assertThat(row, whenSerializedWith(serializationSchema)
+ *          .andDeserializedWith(deserializationSchema)
+ *          .matches(row));
+ * }</pre>
+ * </li>
+ *
+ * <li>to check if an exception is thrown during serialization:
+ * <pre>{@code
+ *      assertThat(row, whenSerializedWith(serializationSchema).failsWithException(instanceOf(RuntimeException.class)));
+ * }</pre>
+ * </li>
+ * </ul>
+ */
+public abstract class SerializationSchemaMatcher extends TypeSafeMatcher<Row> {
+
+	final SerializationSchema<Row> serializationSchema;
+
+	private SerializationSchemaMatcher(SerializationSchema<Row> serializationSchema) {
+		this.serializationSchema = serializationSchema;
+	}
+
+	public static SerializationSchemaMatcherBuilder whenSerializedWith(SerializationSchema<Row> serializationSchema) {
+		return new SerializationSchemaMatcherBuilder(serializationSchema);
+	}
+
+	private static class SerializationSchemaResultMatcher extends SerializationSchemaMatcher {
+
+		private final Row expected;
+		private final DeserializationSchema<Row> deserializationSchema;
+
+		private SerializationSchemaResultMatcher(
+				SerializationSchema<Row> serializationSchema,
+				DeserializationSchema<Row> deserializationSchema,
+				Row expected) {
+			super(serializationSchema);
+
+			this.expected = expected;
+			this.deserializationSchema = deserializationSchema;
+		}
+
+		@Override
+		protected boolean matchesSafely(Row item) {
+			try {
+				return Objects.deepEquals(
+					deserializationSchema.deserialize(serializationSchema.serialize(item)),
+					expected);
+			} catch (IOException e) {
+				throw new AssertionError("Could not deserialize", e);
+			}
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description.appendValue(expected);
+		}
+	}
+
+	private static class SerializationSchemaExceptionMatcher extends SerializationSchemaMatcher {
+
+		private final Matcher<? extends Throwable> exceptionMatcher;
+		private Throwable thrownException = null;
+
+		private SerializationSchemaExceptionMatcher(
+				SerializationSchema<Row> serializationSchema,
+				Matcher<? extends Throwable> exceptionMatcher) {
+			super(serializationSchema);
+			this.exceptionMatcher = exceptionMatcher;
+		}
+
+		@Override
+		protected boolean matchesSafely(Row item) {
+			try {
+				serializationSchema.serialize(item);
+			} catch (Exception e) {
+				thrownException = e;
+			}
+			return exceptionMatcher.matches(thrownException);
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			exceptionMatcher.describeTo(description);
+		}
+
+		@Override
+		protected void describeMismatchSafely(Row item, Description mismatchDescription) {
+			exceptionMatcher.describeMismatch(thrownException, mismatchDescription);
+		}
+	}
+
+	/**
+	 * Builder for {@link SerializationSchemaMatcher} that can assert results after serialize and deserialize.
+	 */
+	public static class SerializationWithDeserializationSchemaMatcherBuilder {
+
+		private SerializationSchema<Row> serializationSchema;
+		private DeserializationSchema<Row> deserializationSchema;
+
+		private SerializationWithDeserializationSchemaMatcherBuilder(
+			SerializationSchema<Row> serializationSchema,
+			DeserializationSchema<Row> deserializationSchema) {
+			try {
+				// we serialize and deserialize the schema to test runtime behavior
+				// when the schema is shipped to the cluster
+				this.serializationSchema = deserializeObject(
+					serializeObject(serializationSchema),
+					this.getClass().getClassLoader());
+				this.deserializationSchema = deserializeObject(
+					serializeObject(deserializationSchema),
+					this.getClass().getClassLoader());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		public SerializationSchemaMatcher equalsTo(Row expected) {
+			return new SerializationSchemaResultMatcher(
+				serializationSchema,
+				deserializationSchema,
+				expected
+			);
+		}
+	}
+
+	/**
+	 * Builder for {@link SerializationSchemaMatcher}.
+	 */
+	public static class SerializationSchemaMatcherBuilder {
+
+		private SerializationSchema<Row> serializationSchema;
+
+		private SerializationSchemaMatcherBuilder(SerializationSchema<Row> serializationSchema) {
+			this.serializationSchema = serializationSchema;
+		}
+
+		public SerializationWithDeserializationSchemaMatcherBuilder andDeserializedWith(DeserializationSchema<Row> deserializationSchema) {
+			return new SerializationWithDeserializationSchemaMatcherBuilder(serializationSchema, deserializationSchema);
+		}
+
+		public SerializationSchemaMatcher failsWithException(Matcher<? extends Throwable> exceptionMatcher) {
+			return new SerializationSchemaExceptionMatcher(
+				serializationSchema,
+				exceptionMatcher
+			);
+		}
+	}
+}