You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:51 UTC

[06/13] flink git commit: [FLINK-8558] [table] Add unified format interfaces and separate formats from connectors

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java
new file mode 100644
index 0000000..4f5e218
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.table.descriptors.Avro;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.formats.DeserializationSchemaFactory;
+import org.apache.flink.table.formats.SerializationSchemaFactory;
+import org.apache.flink.table.formats.TableFormatFactoryService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link AvroRowFormatFactory}.
+ */
+public class AvroRowFormatFactoryTest extends TestLogger {
+
+	private static final Class<User> AVRO_SPECIFIC_RECORD = User.class;
+
+	private static final String AVRO_SCHEMA = User.getClassSchema().toString();
+
+	@Test
+	public void testRecordClass() {
+		final Map<String, String> properties = toMap(new Avro().recordClass(AVRO_SPECIFIC_RECORD));
+
+		testRecordClassDeserializationSchema(properties);
+
+		testRecordClassSerializationSchema(properties);
+	}
+
+	@Test
+	public void testAvroSchema() {
+		final Map<String, String> properties = toMap(new Avro().avroSchema(AVRO_SCHEMA));
+
+		testAvroSchemaSerializationSchema(properties);
+
+		testAvroSchemaDeserializationSchema(properties);
+	}
+
+	private void testRecordClassSerializationSchema(Map<String, String> properties) {
+		final DeserializationSchema<?> actual2 = TableFormatFactoryService
+			.find(DeserializationSchemaFactory.class, properties)
+			.createDeserializationSchema(properties);
+		final AvroRowDeserializationSchema expected2 = new AvroRowDeserializationSchema(AVRO_SPECIFIC_RECORD);
+		assertEquals(expected2, actual2);
+	}
+
+	private void testRecordClassDeserializationSchema(Map<String, String> properties) {
+		final SerializationSchema<?> actual1 = TableFormatFactoryService
+			.find(SerializationSchemaFactory.class, properties)
+			.createSerializationSchema(properties);
+		final SerializationSchema<?> expected1 = new AvroRowSerializationSchema(AVRO_SPECIFIC_RECORD);
+		assertEquals(expected1, actual1);
+	}
+
+	private void testAvroSchemaDeserializationSchema(Map<String, String> properties) {
+		final DeserializationSchema<?> actual2 = TableFormatFactoryService
+			.find(DeserializationSchemaFactory.class, properties)
+			.createDeserializationSchema(properties);
+		final AvroRowDeserializationSchema expected2 = new AvroRowDeserializationSchema(AVRO_SCHEMA);
+		assertEquals(expected2, actual2);
+	}
+
+	private void testAvroSchemaSerializationSchema(Map<String, String> properties) {
+		final SerializationSchema<?> actual1 = TableFormatFactoryService
+			.find(SerializationSchemaFactory.class, properties)
+			.createSerializationSchema(properties);
+		final SerializationSchema<?> expected1 = new AvroRowSerializationSchema(AVRO_SCHEMA);
+		assertEquals(expected1, actual1);
+	}
+
+	private static Map<String, String> toMap(Descriptor... desc) {
+		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		for (Descriptor d : desc) {
+			d.addProperties(descriptorProperties);
+		}
+		return descriptorProperties.asMap();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index df52851..dc8a116 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -38,6 +38,7 @@ import java.lang.reflect.Array;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.Objects;
 
 /**
  * Deserialization schema from JSON to Flink types.
@@ -84,7 +85,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 	 * @see <a href="http://json-schema.org/">http://json-schema.org/</a>
 	 */
 	public JsonRowDeserializationSchema(String jsonSchema) {
-		this(JsonSchemaConverter.convert(jsonSchema));
+		this(JsonRowSchemaConverter.convert(jsonSchema));
 	}
 
 	@Override
@@ -118,6 +119,23 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
 		this.failOnMissingField = failOnMissingField;
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final JsonRowDeserializationSchema that = (JsonRowDeserializationSchema) o;
+		return failOnMissingField == that.failOnMissingField && Objects.equals(typeInfo, that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(typeInfo, failOnMissingField);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private Object convert(JsonNode node, TypeInformation<?> info) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
new file mode 100644
index 0000000..fd7bda6
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.descriptors.JsonValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.formats.DeserializationSchemaFactory;
+import org.apache.flink.table.formats.SerializationSchemaFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table format factory for providing configured instances of JSON-to-row {@link SerializationSchema}
+ * and {@link DeserializationSchema}.
+ */
+public class JsonRowFormatFactory implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {
+
+	@Override
+	public Map<String, String> requiredContext() {
+		final Map<String, String> context = new HashMap<>();
+		context.put(FormatDescriptorValidator.FORMAT_TYPE(), JsonValidator.FORMAT_TYPE_VALUE);
+		context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1");
+		return context;
+	}
+
+	@Override
+	public boolean supportsSchemaDerivation() {
+		return true;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		final List<String> properties = new ArrayList<>();
+		properties.add(JsonValidator.FORMAT_JSON_SCHEMA);
+		properties.add(JsonValidator.FORMAT_SCHEMA);
+		properties.add(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD);
+		properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA());
+		properties.addAll(SchemaValidator.getSchemaDerivationKeys());
+		return properties;
+	}
+
+	@Override
+	public DeserializationSchema<Row> createDeserializationSchema(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = validateAndGetProperties(properties);
+
+		// create and configure
+		final JsonRowDeserializationSchema schema = new JsonRowDeserializationSchema(createTypeInformation(descriptorProperties));
+
+		descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
+				.ifPresent(schema::setFailOnMissingField);
+
+		return schema;
+	}
+
+	@Override
+	public SerializationSchema<Row> createSerializationSchema(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = validateAndGetProperties(properties);
+
+		// create and configure
+		return new JsonRowSerializationSchema(createTypeInformation(descriptorProperties));
+	}
+
+	private static DescriptorProperties validateAndGetProperties(Map<String, String> propertiesMap) {
+		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putProperties(propertiesMap);
+
+		// validate
+		new JsonValidator().validate(descriptorProperties);
+
+		return descriptorProperties;
+	}
+
+	private static TypeInformation<Row> createTypeInformation(DescriptorProperties descriptorProperties) {
+		if (descriptorProperties.containsKey(JsonValidator.FORMAT_SCHEMA)) {
+			return (RowTypeInfo) descriptorProperties.getType(JsonValidator.FORMAT_SCHEMA);
+		} else if (descriptorProperties.containsKey(JsonValidator.FORMAT_JSON_SCHEMA)) {
+			return JsonRowSchemaConverter.convert(descriptorProperties.getString(JsonValidator.FORMAT_JSON_SCHEMA));
+		} else {
+			return SchemaValidator.deriveFormatFields(descriptorProperties).toRowType();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
new file mode 100644
index 0000000..320ca1f
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Converts a JSON schema into Flink's type information. It uses {@link Row} for representing
+ * objects and tuple arrays.
+ *
+ * <p>Note: This converter implements just a subset of the JSON schema specification.
+ * Union types (as well as "allOf", "anyOf", "not") are not supported yet. Simple
+ * references that link to a common definition in the document are supported. "oneOf" and
+ * arrays of types are only supported for specifying nullability.
+ *
+ * <p>This converter has been developed for JSON Schema draft-07 but also includes keywords of
+ * older drafts to be as compatible as possible.
+ */
+public final class JsonRowSchemaConverter {
+
+	private JsonRowSchemaConverter() {
+		// private
+	}
+
+	// see https://spacetelescope.github.io/understanding-json-schema/UnderstandingJSONSchema.pdf
+	private static final String PROPERTIES = "properties";
+	private static final String ADDITIONAL_PROPERTIES = "additionalProperties";
+	private static final String TYPE = "type";
+	private static final String FORMAT = "format";
+	private static final String CONTENT_ENCODING = "contentEncoding";
+	private static final String ITEMS = "items";
+	private static final String ADDITIONAL_ITEMS = "additionalItems";
+	private static final String REF = "$ref";
+	private static final String ALL_OF = "allOf";
+	private static final String ANY_OF = "anyOf";
+	private static final String NOT = "not";
+	private static final String ONE_OF = "oneOf";
+
+	// from https://tools.ietf.org/html/draft-zyp-json-schema-03#page-14
+	private static final String DISALLOW = "disallow";
+	private static final String EXTENDS = "extends";
+
+	private static final String TYPE_NULL = "null";
+	private static final String TYPE_BOOLEAN = "boolean";
+	private static final String TYPE_OBJECT = "object";
+	private static final String TYPE_ARRAY = "array";
+	private static final String TYPE_NUMBER = "number";
+	private static final String TYPE_INTEGER = "integer";
+	private static final String TYPE_STRING = "string";
+
+	private static final String FORMAT_DATE = "date";
+	private static final String FORMAT_TIME = "time";
+	private static final String FORMAT_DATE_TIME = "date-time";
+
+	private static final String CONTENT_ENCODING_BASE64 = "base64";
+
+	/**
+	 * Converts a JSON schema into Flink's type information. Throws an exception if the schema
+	 * cannot converted because of loss of precision or too flexible schema.
+	 *
+	 * <p>The converter can resolve simple schema references to solve those cases where entities
+	 * are defined at the beginning and then used throughout a document.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T> TypeInformation<T> convert(String jsonSchema) {
+		Preconditions.checkNotNull(jsonSchema, "JSON schema");
+		final ObjectMapper mapper = new ObjectMapper();
+		mapper.getFactory()
+			.enable(JsonParser.Feature.ALLOW_COMMENTS)
+			.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
+			.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
+		final JsonNode node;
+		try {
+			node = mapper.readTree(jsonSchema);
+		} catch (IOException e) {
+			throw new IllegalArgumentException(
+				"Invalid JSON schema.", e);
+		}
+		return (TypeInformation<T>) convertType("<root>", node, node);
+	}
+
+	private static TypeInformation<?> convertType(String location, JsonNode node, JsonNode root) {
+		// we use a set here to unify types (e.g. types that just add metadata such as 'multipleOf')
+		final Set<TypeInformation<?>> typeSet = new HashSet<>();
+
+		// search for ref
+		final Optional<JsonNode> ref;
+		if (node.has(REF) && node.get(REF).isTextual()) {
+			// try a simple ref resolver to solve those cases where entities are defined at
+			// the beginning and then used throughout a document
+			ref = Optional.of(resolveReference(node.get(REF).asText(), node, root));
+		} else {
+			ref = Optional.empty();
+		}
+
+		// use TYPE of this node
+		if (node.has(TYPE)) {
+			final JsonNode typeNode = node.get(TYPE);
+
+			List<String> types = new ArrayList<>();
+			// array of types
+			if (typeNode.isArray()) {
+				final Iterator<JsonNode> elements = typeNode.elements();
+				while (elements.hasNext()) {
+					types.add(elements.next().asText());
+				}
+			}
+			// single type
+			else if (typeNode.isTextual()) {
+				types.add(typeNode.asText());
+			}
+
+			for (String type : types) {
+				// set field type
+				switch (type) {
+					case TYPE_NULL:
+						typeSet.add(Types.VOID);
+						break;
+					case TYPE_BOOLEAN:
+						typeSet.add(Types.BOOLEAN);
+						break;
+					case TYPE_STRING:
+						if (node.has(FORMAT)) {
+							typeSet.add(convertStringFormat(location, node.get(FORMAT)));
+						} else if (node.has(CONTENT_ENCODING)) {
+							typeSet.add(convertStringEncoding(location, node.get(CONTENT_ENCODING)));
+						} else {
+							typeSet.add(Types.STRING);
+						}
+						break;
+					case TYPE_NUMBER:
+						typeSet.add(Types.BIG_DEC);
+						break;
+					case TYPE_INTEGER:
+						// use BigDecimal for easier interoperability
+						// without affecting the correctness of the result
+						typeSet.add(Types.BIG_DEC);
+						break;
+					case TYPE_OBJECT:
+						typeSet.add(convertObject(location, node, root));
+						break;
+					case TYPE_ARRAY:
+						typeSet.add(convertArray(location, node, root));
+						break;
+					default:
+						throw new IllegalArgumentException(
+							"Unsupported type '" + node.get(TYPE).asText() + "' in node: " + location);
+				}
+			}
+		}
+		// use TYPE of reference as fallback if present
+		else {
+			ref.filter(r -> r.has(TYPE)).ifPresent(r -> typeSet.add(convertType(node.get(REF).asText(), r, root)));
+		}
+
+		// simple interpretation of ONE_OF for supporting "object or null"
+		if (node.has(ONE_OF) && node.get(ONE_OF).isArray()) {
+			final TypeInformation<?>[] types = convertTypes(location + '/' + ONE_OF, node.get(ONE_OF), root);
+			typeSet.addAll(Arrays.asList(types));
+		}
+		// use ONE_OF of reference as fallback
+		else if (ref.isPresent() && ref.get().has(ONE_OF) && ref.get().get(ONE_OF).isArray()) {
+			final TypeInformation<?>[] types = convertTypes(node.get(REF).asText() + '/' + ONE_OF, ref.get().get(ONE_OF), root);
+			typeSet.addAll(Arrays.asList(types));
+		}
+
+		// validate no union types or extending
+		if (node.has(ALL_OF) || node.has(ANY_OF) || node.has(NOT) || node.has(EXTENDS) || node.has(DISALLOW)) {
+			throw new IllegalArgumentException(
+				"Union types are such as '" + ALL_OF + "', '" + ANY_OF + "' etc. " +
+					"and extending are not supported yet.");
+		}
+
+		// only a type (with null) is supported yet
+		final List<TypeInformation<?>> types = new ArrayList<>(typeSet);
+		if (types.size() == 0) {
+			throw new IllegalArgumentException("No type could be found in node:" + location);
+		} else if (types.size() > 2 || (types.size() == 2 && !types.contains(Types.VOID))) {
+			throw new IllegalArgumentException(
+				"Union types with more than just a null type are not supported yet.");
+		}
+
+		// return the first non-void type or void
+		if (types.size() == 2 && types.get(0) == Types.VOID) {
+			return types.get(1);
+		} else {
+			return types.get(0);
+		}
+	}
+
+	private static TypeInformation<Row> convertObject(String location, JsonNode node, JsonNode root) {
+		// validate properties
+		if (!node.has(PROPERTIES)) {
+			return Types.ROW();
+		}
+		if (!node.isObject()) {
+			throw new IllegalArgumentException(
+				"Invalid '" + PROPERTIES + "' property for object type in node: " + location);
+		}
+		final JsonNode props = node.get(PROPERTIES);
+		final String[] names = new String[props.size()];
+		final TypeInformation<?>[] types = new TypeInformation[props.size()];
+
+		final Iterator<Map.Entry<String, JsonNode>> fieldIter = props.fields();
+		int i = 0;
+		while (fieldIter.hasNext()) {
+			final Map.Entry<String, JsonNode> subNode = fieldIter.next();
+
+			// set field name
+			names[i] = subNode.getKey();
+
+			// set type
+			types[i] = convertType(location + '/' + subNode.getKey(), subNode.getValue(), root);
+
+			i++;
+		}
+
+		// validate that object does not contain additional properties
+		if (node.has(ADDITIONAL_PROPERTIES) && node.get(ADDITIONAL_PROPERTIES).isBoolean() &&
+				node.get(ADDITIONAL_PROPERTIES).asBoolean()) {
+			throw new IllegalArgumentException(
+				"An object must not allow additional properties in node: " + location);
+		}
+
+		return Types.ROW_NAMED(names, types);
+	}
+
+	private static TypeInformation<?> convertArray(String location, JsonNode node, JsonNode root) {
+		// validate items
+		if (!node.has(ITEMS)) {
+			throw new IllegalArgumentException(
+				"Arrays must specify an '" + ITEMS + "' property in node: " + location);
+		}
+		final JsonNode items = node.get(ITEMS);
+
+		// list (translated to object array)
+		if (items.isObject()) {
+			final TypeInformation<?> elementType = convertType(
+				location + '/' + ITEMS,
+				items,
+				root);
+			// result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
+			return Types.OBJECT_ARRAY(elementType);
+		}
+		// tuple (translated to row)
+		else if (items.isArray()) {
+			final TypeInformation<?>[] types = convertTypes(location + '/' + ITEMS, items, root);
+
+			// validate that array does not contain additional items
+			if (node.has(ADDITIONAL_ITEMS) && node.get(ADDITIONAL_ITEMS).isBoolean() &&
+					node.get(ADDITIONAL_ITEMS).asBoolean()) {
+				throw new IllegalArgumentException(
+					"An array tuple must not allow additional items in node: " + location);
+			}
+
+			return Types.ROW(types);
+		}
+		throw new IllegalArgumentException(
+			"Invalid type for '" + ITEMS + "' property in node: " + location);
+	}
+
+	private static TypeInformation<?> convertStringFormat(String location, JsonNode node) {
+		if (!node.isTextual()) {
+			throw new IllegalArgumentException("Invalid '" + FORMAT + "' property in node: " + location);
+		}
+
+		switch (node.asText()) {
+			case FORMAT_DATE:
+				return Types.SQL_DATE;
+			case FORMAT_TIME:
+				return Types.SQL_TIME;
+			case FORMAT_DATE_TIME:
+				return Types.SQL_TIMESTAMP;
+			default:
+				return Types.STRING; // unlikely that we will support other formats in the future
+		}
+	}
+
+	private static TypeInformation<?> convertStringEncoding(String location, JsonNode node) {
+		if (!node.isTextual()) {
+			throw new IllegalArgumentException("Invalid '" + CONTENT_ENCODING + "' property in node: " + location);
+		}
+
+		// "If the instance value is a string, this property defines that the string SHOULD
+		// be interpreted as binary data and decoded using the encoding named by this property."
+
+		switch (node.asText()) {
+			case CONTENT_ENCODING_BASE64:
+				return Types.PRIMITIVE_ARRAY(Types.BYTE);
+			default:
+				// we fail hard here:
+				// this gives us the chance to support more encodings in the future without problems
+				// of backwards compatibility
+				throw new IllegalArgumentException("Invalid encoding '" + node.asText() + "' in node: " + location);
+		}
+	}
+
+	private static JsonNode resolveReference(String ref, JsonNode origin, JsonNode root) {
+		if (!ref.startsWith("#")) {
+			throw new IllegalArgumentException("Only JSON schemes with simple references " +
+				"(one indirection in the same document) are supported yet. But was: " + ref);
+		}
+		final String path = ref.substring(1);
+		final JsonNode foundNode = root.at(path);
+		if (foundNode.isMissingNode()) {
+			throw new IllegalArgumentException("Could not find reference: " + ref);
+		}
+		// prevent obvious cyclic references
+		if (foundNode == origin) {
+			throw new IllegalArgumentException("Cyclic references are not supported:" + ref);
+		}
+		return foundNode;
+	}
+
+	private static TypeInformation<?>[] convertTypes(String location, JsonNode arrayNode, JsonNode root) {
+		final TypeInformation<?>[] types = new TypeInformation[arrayNode.size()];
+		final Iterator<JsonNode> elements = arrayNode.elements();
+		int i = 0;
+		while (elements.hasNext()) {
+			final TypeInformation<?> elementType = convertType(
+				location + '[' + i + ']',
+				elements.next(),
+				root);
+			types[i] = elementType;
+			i += 1;
+		}
+		return types;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index 8fee6a4..d942062 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -39,6 +39,7 @@ import java.math.BigInteger;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
+import java.util.Objects;
 
 /**
  * Serialization schema that serializes an object of Flink types into a JSON bytes.
@@ -89,7 +90,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 	 * @see <a href="http://json-schema.org/">http://json-schema.org/</a>
 	 */
 	public JsonRowSerializationSchema(String jsonSchema) {
-		this(JsonSchemaConverter.convert(jsonSchema));
+		this(JsonRowSchemaConverter.convert(jsonSchema));
 	}
 
 	@Override
@@ -107,6 +108,23 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
 		}
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final JsonRowSerializationSchema that = (JsonRowSerializationSchema) o;
+		return Objects.equals(typeInfo, that.typeInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(typeInfo);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
deleted file mode 100644
index 7a001f6..0000000
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * Converts a JSON schema into Flink's type information. It uses {@link Row} for representing
- * objects and tuple arrays.
- *
- * <p>Note: This converter implements just a subset of the JSON schema specification.
- * Union types (as well as "allOf", "anyOf", "not") are not supported yet. Simple
- * references that link to a common definition in the document are supported. "oneOf" and
- * arrays of types are only supported for specifying nullability.
- *
- * <p>This converter has been developed for JSON Schema draft-07 but also includes keywords of
- * older drafts to be as compatible as possible.
- */
-public final class JsonSchemaConverter {
-
-	private JsonSchemaConverter() {
-		// private
-	}
-
-	// see https://spacetelescope.github.io/understanding-json-schema/UnderstandingJSONSchema.pdf
-	private static final String PROPERTIES = "properties";
-	private static final String ADDITIONAL_PROPERTIES = "additionalProperties";
-	private static final String TYPE = "type";
-	private static final String FORMAT = "format";
-	private static final String CONTENT_ENCODING = "contentEncoding";
-	private static final String ITEMS = "items";
-	private static final String ADDITIONAL_ITEMS = "additionalItems";
-	private static final String REF = "$ref";
-	private static final String ALL_OF = "allOf";
-	private static final String ANY_OF = "anyOf";
-	private static final String NOT = "not";
-	private static final String ONE_OF = "oneOf";
-
-	// from https://tools.ietf.org/html/draft-zyp-json-schema-03#page-14
-	private static final String DISALLOW = "disallow";
-	private static final String EXTENDS = "extends";
-
-	private static final String TYPE_NULL = "null";
-	private static final String TYPE_BOOLEAN = "boolean";
-	private static final String TYPE_OBJECT = "object";
-	private static final String TYPE_ARRAY = "array";
-	private static final String TYPE_NUMBER = "number";
-	private static final String TYPE_INTEGER = "integer";
-	private static final String TYPE_STRING = "string";
-
-	private static final String FORMAT_DATE = "date";
-	private static final String FORMAT_TIME = "time";
-	private static final String FORMAT_DATE_TIME = "date-time";
-
-	private static final String CONTENT_ENCODING_BASE64 = "base64";
-
-	/**
-	 * Converts a JSON schema into Flink's type information. Throws an exception if the schema
-	 * cannot converted because of loss of precision or too flexible schema.
-	 *
-	 * <p>The converter can resolve simple schema references to solve those cases where entities
-	 * are defined at the beginning and then used throughout a document.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T> TypeInformation<T> convert(String jsonSchema) {
-		Preconditions.checkNotNull(jsonSchema, "JSON schema");
-		final ObjectMapper mapper = new ObjectMapper();
-		mapper.getFactory()
-			.enable(JsonParser.Feature.ALLOW_COMMENTS)
-			.enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
-			.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
-		final JsonNode node;
-		try {
-			node = mapper.readTree(jsonSchema);
-		} catch (IOException e) {
-			throw new IllegalArgumentException(
-				"Invalid JSON schema.", e);
-		}
-		return (TypeInformation<T>) convertType("<root>", node, node);
-	}
-
-	private static TypeInformation<?> convertType(String location, JsonNode node, JsonNode root) {
-		// we use a set here to unify types (e.g. types that just add metadata such as 'multipleOf')
-		final Set<TypeInformation<?>> typeSet = new HashSet<>();
-
-		// search for ref
-		final Optional<JsonNode> ref;
-		if (node.has(REF) && node.get(REF).isTextual()) {
-			// try a simple ref resolver to solve those cases where entities are defined at
-			// the beginning and then used throughout a document
-			ref = Optional.of(resolveReference(node.get(REF).asText(), node, root));
-		} else {
-			ref = Optional.empty();
-		}
-
-		// use TYPE of this node
-		if (node.has(TYPE)) {
-			final JsonNode typeNode = node.get(TYPE);
-
-			List<String> types = new ArrayList<>();
-			// array of types
-			if (typeNode.isArray()) {
-				final Iterator<JsonNode> elements = typeNode.elements();
-				while (elements.hasNext()) {
-					types.add(elements.next().asText());
-				}
-			}
-			// single type
-			else if (typeNode.isTextual()) {
-				types.add(typeNode.asText());
-			}
-
-			for (String type : types) {
-				// set field type
-				switch (type) {
-					case TYPE_NULL:
-						typeSet.add(Types.VOID);
-						break;
-					case TYPE_BOOLEAN:
-						typeSet.add(Types.BOOLEAN);
-						break;
-					case TYPE_STRING:
-						if (node.has(FORMAT)) {
-							typeSet.add(convertStringFormat(location, node.get(FORMAT)));
-						} else if (node.has(CONTENT_ENCODING)) {
-							typeSet.add(convertStringEncoding(location, node.get(CONTENT_ENCODING)));
-						} else {
-							typeSet.add(Types.STRING);
-						}
-						break;
-					case TYPE_NUMBER:
-						typeSet.add(Types.BIG_DEC);
-						break;
-					case TYPE_INTEGER:
-						// use BigDecimal for easier interoperability
-						// without affecting the correctness of the result
-						typeSet.add(Types.BIG_DEC);
-						break;
-					case TYPE_OBJECT:
-						typeSet.add(convertObject(location, node, root));
-						break;
-					case TYPE_ARRAY:
-						typeSet.add(convertArray(location, node, root));
-						break;
-					default:
-						throw new IllegalArgumentException(
-							"Unsupported type '" + node.get(TYPE).asText() + "' in node: " + location);
-				}
-			}
-		}
-		// use TYPE of reference as fallback if present
-		else {
-			ref.filter(r -> r.has(TYPE)).ifPresent(r -> typeSet.add(convertType(node.get(REF).asText(), r, root)));
-		}
-
-		// simple interpretation of ONE_OF for supporting "object or null"
-		if (node.has(ONE_OF) && node.get(ONE_OF).isArray()) {
-			final TypeInformation<?>[] types = convertTypes(location + '/' + ONE_OF, node.get(ONE_OF), root);
-			typeSet.addAll(Arrays.asList(types));
-		}
-		// use ONE_OF of reference as fallback
-		else if (ref.isPresent() && ref.get().has(ONE_OF) && ref.get().get(ONE_OF).isArray()) {
-			final TypeInformation<?>[] types = convertTypes(node.get(REF).asText() + '/' + ONE_OF, ref.get().get(ONE_OF), root);
-			typeSet.addAll(Arrays.asList(types));
-		}
-
-		// validate no union types or extending
-		if (node.has(ALL_OF) || node.has(ANY_OF) || node.has(NOT) || node.has(EXTENDS) || node.has(DISALLOW)) {
-			throw new IllegalArgumentException(
-				"Union types are such as '" + ALL_OF + "', '" + ANY_OF + "' etc. " +
-					"and extending are not supported yet.");
-		}
-
-		// only a type (with null) is supported yet
-		final List<TypeInformation<?>> types = new ArrayList<>(typeSet);
-		if (types.size() == 0) {
-			throw new IllegalArgumentException("No type could be found in node:" + location);
-		} else if (types.size() > 2 || (types.size() == 2 && !types.contains(Types.VOID))) {
-			throw new IllegalArgumentException(
-				"Union types with more than just a null type are not supported yet.");
-		}
-
-		// return the first non-void type or void
-		if (types.size() == 2 && types.get(0) == Types.VOID) {
-			return types.get(1);
-		} else {
-			return types.get(0);
-		}
-	}
-
-	private static TypeInformation<Row> convertObject(String location, JsonNode node, JsonNode root) {
-		// validate properties
-		if (!node.has(PROPERTIES)) {
-			return Types.ROW();
-		}
-		if (!node.isObject()) {
-			throw new IllegalArgumentException(
-				"Invalid '" + PROPERTIES + "' property for object type in node: " + location);
-		}
-		final JsonNode props = node.get(PROPERTIES);
-		final String[] names = new String[props.size()];
-		final TypeInformation<?>[] types = new TypeInformation[props.size()];
-
-		final Iterator<Map.Entry<String, JsonNode>> fieldIter = props.fields();
-		int i = 0;
-		while (fieldIter.hasNext()) {
-			final Map.Entry<String, JsonNode> subNode = fieldIter.next();
-
-			// set field name
-			names[i] = subNode.getKey();
-
-			// set type
-			types[i] = convertType(location + '/' + subNode.getKey(), subNode.getValue(), root);
-
-			i++;
-		}
-
-		// validate that object does not contain additional properties
-		if (node.has(ADDITIONAL_PROPERTIES) && node.get(ADDITIONAL_PROPERTIES).isBoolean() &&
-				node.get(ADDITIONAL_PROPERTIES).asBoolean()) {
-			throw new IllegalArgumentException(
-				"An object must not allow additional properties in node: " + location);
-		}
-
-		return Types.ROW_NAMED(names, types);
-	}
-
-	private static TypeInformation<?> convertArray(String location, JsonNode node, JsonNode root) {
-		// validate items
-		if (!node.has(ITEMS)) {
-			throw new IllegalArgumentException(
-				"Arrays must specify an '" + ITEMS + "' property in node: " + location);
-		}
-		final JsonNode items = node.get(ITEMS);
-
-		// list (translated to object array)
-		if (items.isObject()) {
-			final TypeInformation<?> elementType = convertType(
-				location + '/' + ITEMS,
-				items,
-				root);
-			// result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
-			return Types.OBJECT_ARRAY(elementType);
-		}
-		// tuple (translated to row)
-		else if (items.isArray()) {
-			final TypeInformation<?>[] types = convertTypes(location + '/' + ITEMS, items, root);
-
-			// validate that array does not contain additional items
-			if (node.has(ADDITIONAL_ITEMS) && node.get(ADDITIONAL_ITEMS).isBoolean() &&
-					node.get(ADDITIONAL_ITEMS).asBoolean()) {
-				throw new IllegalArgumentException(
-					"An array tuple must not allow additional items in node: " + location);
-			}
-
-			return Types.ROW(types);
-		}
-		throw new IllegalArgumentException(
-			"Invalid type for '" + ITEMS + "' property in node: " + location);
-	}
-
-	private static TypeInformation<?> convertStringFormat(String location, JsonNode node) {
-		if (!node.isTextual()) {
-			throw new IllegalArgumentException("Invalid '" + FORMAT + "' property in node: " + location);
-		}
-
-		switch (node.asText()) {
-			case FORMAT_DATE:
-				return Types.SQL_DATE;
-			case FORMAT_TIME:
-				return Types.SQL_TIME;
-			case FORMAT_DATE_TIME:
-				return Types.SQL_TIMESTAMP;
-			default:
-				return Types.STRING; // unlikely that we will support other formats in the future
-		}
-	}
-
-	private static TypeInformation<?> convertStringEncoding(String location, JsonNode node) {
-		if (!node.isTextual()) {
-			throw new IllegalArgumentException("Invalid '" + CONTENT_ENCODING + "' property in node: " + location);
-		}
-
-		// "If the instance value is a string, this property defines that the string SHOULD
-		// be interpreted as binary data and decoded using the encoding named by this property."
-
-		switch (node.asText()) {
-			case CONTENT_ENCODING_BASE64:
-				return Types.PRIMITIVE_ARRAY(Types.BYTE);
-			default:
-				// we fail hard here:
-				// this gives us the chance to support more encodings in the future without problems
-				// of backwards compatibility
-				throw new IllegalArgumentException("Invalid encoding '" + node.asText() + "' in node: " + location);
-		}
-	}
-
-	private static JsonNode resolveReference(String ref, JsonNode origin, JsonNode root) {
-		if (!ref.startsWith("#")) {
-			throw new IllegalArgumentException("Only JSON schemes with simple references " +
-				"(one indirection in the same document) are supported yet. But was: " + ref);
-		}
-		final String path = ref.substring(1);
-		final JsonNode foundNode = root.at(path);
-		if (foundNode.isMissingNode()) {
-			throw new IllegalArgumentException("Could not find reference: " + ref);
-		}
-		// prevent obvious cyclic references
-		if (foundNode == origin) {
-			throw new IllegalArgumentException("Cyclic references are not supported:" + ref);
-		}
-		return foundNode;
-	}
-
-	private static TypeInformation<?>[] convertTypes(String location, JsonNode arrayNode, JsonNode root) {
-		final TypeInformation<?>[] types = new TypeInformation[arrayNode.size()];
-		final Iterator<JsonNode> elements = arrayNode.elements();
-		int i = 0;
-		while (elements.hasNext()) {
-			final TypeInformation<?> elementType = convertType(
-				location + '[' + i + ']',
-				elements.next(),
-				root);
-			types[i] = elementType;
-			i += 1;
-		}
-		return types;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
index 9c12191..035f05f 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.typeutils.TypeStringUtils;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
@@ -81,7 +82,7 @@ public class Json extends FormatDescriptor {
 	 *
 	 * @param schemaType type information that describes the schema
 	 */
-	public Json schema(TypeInformation<?> schemaType) {
+	public Json schema(TypeInformation<Row> schemaType) {
 		Preconditions.checkNotNull(schemaType);
 		this.schema = TypeStringUtils.writeTypeInfo(schemaType);
 		this.jsonSchema = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
index fea7cf5..49e1abc 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
@@ -45,7 +45,7 @@ public class JsonValidator extends FormatDescriptorValidator {
 		} else if (!deriveSchema && !hasSchema && !hasSchemaString) {
 			throw new ValidationException("A definition of a schema or JSON schema is required.");
 		} else if (hasSchema) {
-			properties.validateType(FORMAT_SCHEMA, false);
+			properties.validateType(FORMAT_SCHEMA, true, false);
 		} else if (hasSchemaString) {
 			properties.validateString(FORMAT_JSON_SCHEMA, false, 1);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
new file mode 100644
index 0000000..aec5846
--- /dev/null
+++ b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.json.JsonRowFormatFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
new file mode 100644
index 0000000..d763b90
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Json;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.formats.DeserializationSchemaFactory;
+import org.apache.flink.table.formats.SerializationSchemaFactory;
+import org.apache.flink.table.formats.TableFormatFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link JsonRowFormatFactory}.
+ */
+public class JsonRowFormatFactoryTest extends TestLogger {
+
+	private static final String JSON_SCHEMA =
+		"{" +
+		"  'title': 'Fruit'," +
+		"  'type': 'object'," +
+		"  'properties': {" +
+		"    'name': {" +
+		"      'type': 'string'" +
+		"    }," +
+		"    'count': {" +
+		"      'type': 'integer'" +
+		"    }," +
+		"    'time': {" +
+		"      'description': 'row time'," +
+		"      'type': 'string'," +
+		"      'format': 'date-time'" +
+		"    }" +
+		"  }," +
+		"  'required': ['name', 'count', 'time']" +
+		"}";
+
+	private static final TypeInformation<Row> SCHEMA = Types.ROW(
+		new String[]{"field1", "field2"},
+		new TypeInformation[]{Types.BOOLEAN(), Types.INT()});
+
+	@Test
+	public void testSchema() {
+		final Map<String, String> properties = toMap(
+			new Json()
+				.schema(SCHEMA)
+				.failOnMissingField(false));
+
+		testSchemaSerializationSchema(properties);
+
+		testSchemaDeserializationSchema(properties);
+	}
+
+	@Test
+	public void testJsonSchema() {
+		final Map<String, String> properties = toMap(
+			new Json()
+				.jsonSchema(JSON_SCHEMA)
+				.failOnMissingField(true));
+
+		testJsonSchemaSerializationSchema(properties);
+
+		testJsonSchemaDeserializationSchema(properties);
+	}
+
+	@Test
+	public void testSchemaDerivation() {
+		final Map<String, String> properties = toMap(
+			new Schema()
+				.field("field1", Types.BOOLEAN())
+				.field("field2", Types.INT())
+				.field("proctime", Types.SQL_TIMESTAMP()).proctime(),
+			new Json()
+				.deriveSchema());
+
+		testSchemaSerializationSchema(properties);
+
+		testSchemaDeserializationSchema(properties);
+	}
+
+	private void testSchemaDeserializationSchema(Map<String, String> properties) {
+		final DeserializationSchema<?> actual2 = TableFormatFactoryService
+			.find(DeserializationSchemaFactory.class, properties)
+			.createDeserializationSchema(properties);
+		final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema(SCHEMA);
+		expected2.setFailOnMissingField(false);
+		assertEquals(expected2, actual2);
+	}
+
+	private void testSchemaSerializationSchema(Map<String, String> properties) {
+		final SerializationSchema<?> actual1 = TableFormatFactoryService
+			.find(SerializationSchemaFactory.class, properties)
+			.createSerializationSchema(properties);
+		final SerializationSchema expected1 = new JsonRowSerializationSchema(SCHEMA);
+		assertEquals(expected1, actual1);
+	}
+
+	private void testJsonSchemaDeserializationSchema(Map<String, String> properties) {
+		final DeserializationSchema<?> actual2 = TableFormatFactoryService
+			.find(DeserializationSchemaFactory.class, properties)
+			.createDeserializationSchema(properties);
+		final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema(JSON_SCHEMA);
+		expected2.setFailOnMissingField(true);
+		assertEquals(expected2, actual2);
+	}
+
+	private void testJsonSchemaSerializationSchema(Map<String, String> properties) {
+		final SerializationSchema<?> actual1 = TableFormatFactoryService
+			.find(SerializationSchemaFactory.class, properties)
+			.createSerializationSchema(properties);
+		final SerializationSchema<?> expected1 = new JsonRowSerializationSchema(JSON_SCHEMA);
+		assertEquals(expected1, actual1);
+	}
+
+	private static Map<String, String> toMap(Descriptor... desc) {
+		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		for (Descriptor d : desc) {
+			d.addProperties(descriptorProperties);
+		}
+		return descriptorProperties.asMap();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSchemaConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSchemaConverterTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSchemaConverterTest.java
new file mode 100644
index 0000000..1af45f4
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSchemaConverterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JsonRowSchemaConverter}.
+ */
+public class JsonRowSchemaConverterTest {
+
+	@Test
+	public void testComplexSchema() throws Exception {
+		final URL url = getClass().getClassLoader().getResource("complex-schema.json");
+		Objects.requireNonNull(url);
+		final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
+		final TypeInformation<?> result = JsonRowSchemaConverter.convert(schema);
+
+		final TypeInformation<?> expected = Types.ROW_NAMED(
+			new String[] {"fn", "familyName", "additionalName", "tuples", "honorificPrefix", "url",
+				"email", "tel", "sound", "org"},
+			Types.STRING, Types.STRING, Types.BOOLEAN, Types.ROW(Types.BIG_DEC, Types.STRING, Types.STRING, Types.STRING),
+			Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.ROW_NAMED(new String[] {"type", "value"}, Types.STRING, Types.STRING),
+			Types.ROW_NAMED(new String[] {"type", "value"}, Types.BIG_DEC, Types.STRING), Types.VOID,
+			Types.ROW_NAMED(new String[] {"organizationUnit"}, Types.ROW()));
+
+		assertEquals(expected, result);
+	}
+
+	@Test
+	public void testReferenceSchema() throws Exception {
+		final URL url = getClass().getClassLoader().getResource("reference-schema.json");
+		Objects.requireNonNull(url);
+		final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
+		final TypeInformation<?> result = JsonRowSchemaConverter.convert(schema);
+
+		final TypeInformation<?> expected = Types.ROW_NAMED(
+			new String[] {"billing_address", "shipping_address", "optional_address"},
+			Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING),
+			Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING),
+			Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING));
+
+		assertEquals(expected, result);
+	}
+
+	@Test
+	public void testAtomicType() {
+		final TypeInformation<?> result = JsonRowSchemaConverter.convert("{ type: 'number' }");
+
+		assertEquals(Types.BIG_DEC, result);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testMissingType() {
+		JsonRowSchemaConverter.convert("{ }");
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testWrongType() {
+		JsonRowSchemaConverter.convert("{ type: 'whatever' }");
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testArrayWithAdditionalItems() {
+		JsonRowSchemaConverter.convert("{ type: 'array', items: [{type: 'integer'}], additionalItems: true }");
+	}
+
+	@Test
+	public void testMissingProperties() {
+		final TypeInformation<?> result = JsonRowSchemaConverter.convert("{ type: 'object' }");
+
+		assertEquals(Types.ROW(), result);
+	}
+
+	@Test
+	public void testNullUnionTypes() {
+		final TypeInformation<?> result = JsonRowSchemaConverter.convert("{ type: ['string', 'null'] }");
+
+		assertEquals(Types.STRING, result);
+	}
+
+	@Test
+	public void testTimestamp() {
+		final TypeInformation<?> result = JsonRowSchemaConverter.convert("{ type: 'string', format: 'date-time' }");
+
+		assertEquals(Types.SQL_TIMESTAMP, result);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
index 94a05b3..e2410d4 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
@@ -111,7 +111,7 @@ public class JsonRowSerializationSchemaTest {
 
 	@Test
 	public void testSchema() throws IOException {
-		final TypeInformation<Row> rowSchema = JsonSchemaConverter.convert(
+		final TypeInformation<Row> rowSchema = JsonRowSchemaConverter.convert(
 			"{" +
 			"    type: 'object'," +
 			"    properties: {" +

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSchemaConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSchemaConverterTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSchemaConverterTest.java
deleted file mode 100644
index 7cf3b7c..0000000
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSchemaConverterTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.util.FileUtils;
-
-import org.junit.Test;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Objects;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link JsonSchemaConverter}.
- */
-public class JsonSchemaConverterTest {
-
-	@Test
-	public void testComplexSchema() throws Exception {
-		final URL url = getClass().getClassLoader().getResource("complex-schema.json");
-		Objects.requireNonNull(url);
-		final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
-		final TypeInformation<?> result = JsonSchemaConverter.convert(schema);
-
-		final TypeInformation<?> expected = Types.ROW_NAMED(
-			new String[] {"fn", "familyName", "additionalName", "tuples", "honorificPrefix", "url",
-				"email", "tel", "sound", "org"},
-			Types.STRING, Types.STRING, Types.BOOLEAN, Types.ROW(Types.BIG_DEC, Types.STRING, Types.STRING, Types.STRING),
-			Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.ROW_NAMED(new String[] {"type", "value"}, Types.STRING, Types.STRING),
-			Types.ROW_NAMED(new String[] {"type", "value"}, Types.BIG_DEC, Types.STRING), Types.VOID,
-			Types.ROW_NAMED(new String[] {"organizationUnit"}, Types.ROW()));
-
-		assertEquals(expected, result);
-	}
-
-	@Test
-	public void testReferenceSchema() throws Exception {
-		final URL url = getClass().getClassLoader().getResource("reference-schema.json");
-		Objects.requireNonNull(url);
-		final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
-		final TypeInformation<?> result = JsonSchemaConverter.convert(schema);
-
-		final TypeInformation<?> expected = Types.ROW_NAMED(
-			new String[] {"billing_address", "shipping_address", "optional_address"},
-			Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING),
-			Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING),
-			Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING));
-
-		assertEquals(expected, result);
-	}
-
-	@Test
-	public void testAtomicType() {
-		final TypeInformation<?> result = JsonSchemaConverter.convert("{ type: 'number' }");
-
-		assertEquals(Types.BIG_DEC, result);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testMissingType() {
-		JsonSchemaConverter.convert("{ }");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testWrongType() {
-		JsonSchemaConverter.convert("{ type: 'whatever' }");
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testArrayWithAdditionalItems() {
-		JsonSchemaConverter.convert("{ type: 'array', items: [{type: 'integer'}], additionalItems: true }");
-	}
-
-	@Test
-	public void testMissingProperties() {
-		final TypeInformation<?> result = JsonSchemaConverter.convert("{ type: 'object' }");
-
-		assertEquals(Types.ROW(), result);
-	}
-
-	@Test
-	public void testNullUnionTypes() {
-		final TypeInformation<?> result = JsonSchemaConverter.convert("{ type: ['string', 'null'] }");
-
-		assertEquals(Types.STRING, result);
-	}
-
-	@Test
-	public void testTimestamp() {
-		final TypeInformation<?> result = JsonSchemaConverter.convert("{ type: 'string', format: 'date-time' }");
-
-		assertEquals(Types.SQL_TIMESTAMP, result);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 05255fd..043a345 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, ConnectorDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{Expression, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -110,16 +111,44 @@ abstract class BatchTableEnvironment(
     }
   }
 
-// TODO expose this once we have enough table source factories that can deal with it
-//  /**
-//    * Creates a table from a descriptor that describes the source connector, source encoding,
-//    * the resulting table schema, and other properties.
-//    *
-//    * @param connectorDescriptor connector descriptor describing the source of the table
-//    */
-//  def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
-//    new BatchTableSourceDescriptor(this, connectorDescriptor)
-//  }
+  /**
+    * Creates a table from a descriptor that describes the source connector, the source format,
+    * the resulting table schema, and other properties.
+    *
+    * Descriptors allow for declaring communication to external systems in an
+    * implementation-agnostic way. The classpath is scanned for connectors and matching connectors
+    * are configured accordingly.
+    *
+    * The following example shows how to read from a Kafka connector using a JSON format and
+    * creating a table:
+    *
+    * {{{
+    *
+    * tableEnv
+    *   .from(
+    *     new Kafka()
+    *       .version("0.11")
+    *       .topic("clicks")
+    *       .property("zookeeper.connect", "localhost")
+    *       .property("group.id", "click-group")
+    *       .startFromEarliest())
+    *   .withFormat(
+    *     new Json()
+    *       .jsonSchema("{...}")
+    *       .failOnMissingField(false))
+    *   .withSchema(
+    *     new Schema()
+    *       .field("user-name", "VARCHAR").from("u_name")
+    *       .field("count", "DECIMAL")
+    *       .field("proc-time", "TIMESTAMP").proctime())
+    *   .toTable()
+    * }}}
+    *
+    * @param connectorDescriptor connector descriptor describing the source of the table
+    */
+  def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
+    new BatchTableSourceDescriptor(this, connectorDescriptor)
+  }
 
   /**
     * Registers an external [[TableSink]] with given field names and types in this

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 84d7240..510fe0d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableSourceDescriptor}
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -128,16 +129,44 @@ abstract class StreamTableEnvironment(
     }
   }
 
-// TODO expose this once we have enough table source factories that can deal with it
-//  /**
-//    * Creates a table from a descriptor that describes the source connector, source encoding,
-//    * the resulting table schema, and other properties.
-//    *
-//    * @param connectorDescriptor connector descriptor describing the source of the table
-//    */
-//  def from(connectorDescriptor: ConnectorDescriptor): StreamTableSourceDescriptor = {
-//    new StreamTableSourceDescriptor(this, connectorDescriptor)
-//  }
+  /**
+    * Creates a table from a descriptor that describes the source connector, the source format,
+    * the resulting table schema, and other properties.
+    *
+    * Descriptors allow for declaring communication to external systems in an
+    * implementation-agnostic way. The classpath is scanned for connectors and matching connectors
+    * are configured accordingly.
+    *
+    * The following example shows how to read from a Kafka connector using a JSON format and
+    * creating a table:
+    *
+    * {{{
+    *
+    * tableEnv
+    *   .from(
+    *     new Kafka()
+    *       .version("0.11")
+    *       .topic("clicks")
+    *       .property("zookeeper.connect", "localhost")
+    *       .property("group.id", "click-group")
+    *       .startFromEarliest())
+    *   .withFormat(
+    *     new Json()
+    *       .jsonSchema("{...}")
+    *       .failOnMissingField(false))
+    *   .withSchema(
+    *     new Schema()
+    *       .field("user-name", "VARCHAR").from("u_name")
+    *       .field("count", "DECIMAL")
+    *       .field("proc-time", "TIMESTAMP").proctime())
+    *   .toTable()
+    * }}}
+    *
+    * @param connectorDescriptor connector descriptor describing the source of the table
+    */
+  def from(connectorDescriptor: ConnectorDescriptor): StreamTableSourceDescriptor = {
+    new StreamTableSourceDescriptor(this, connectorDescriptor)
+  }
 
   /**
     * Registers an external [[TableSink]] with given field names and types in this

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d6106be..88dc1e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -49,6 +49,7 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableSourceDescriptor}
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
@@ -511,6 +512,43 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
   }
 
+  /**
+    * Creates a table from a descriptor that describes the source connector, the source format,
+    * the resulting table schema, and other properties.
+    *
+    * Descriptors allow for declaring communication to external systems in an
+    * implementation-agnostic way. The classpath is scanned for connectors and matching connectors
+    * are configured accordingly.
+    *
+    * The following example shows how to read from a Kafka connector using a JSON format and
+    * creating table:
+    *
+    * {{{
+    *
+    * tableEnv
+    *   .from(
+    *     new Kafka()
+    *       .version("0.11")
+    *       .topic("clicks")
+    *       .property("zookeeper.connect", "localhost")
+    *       .property("group.id", "click-group")
+    *       .startFromEarliest())
+    *   .withFormat(
+    *     new Json()
+    *       .jsonSchema("{...}")
+    *       .failOnMissingField(false))
+    *   .withSchema(
+    *     new Schema()
+    *       .field("user-name", "VARCHAR").from("u_name")
+    *       .field("count", "DECIMAL")
+    *       .field("proc-time", "TIMESTAMP").proctime())
+    *   .toTable()
+    * }}}
+    *
+    * @param connectorDescriptor connector descriptor describing the source of the table
+    */
+  def from(connectorDescriptor: ConnectorDescriptor): TableSourceDescriptor
+
   private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
     require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
     val schemaPaths = tablePath.slice(0, tablePath.length - 1)

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index 6389b55..bcee5ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -24,6 +24,7 @@ import _root_.scala.collection.mutable.ArrayBuffer
 import _root_.java.util.Objects
 
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.types.Row
 
 /**
   * A TableSchema represents a Table's structure.
@@ -135,6 +136,15 @@ class TableSchema(
     new TableSchema(columnNames, converted)
   }
 
+  /**
+    * Converts a table schema into a (nested) type information describing a [[Row]].
+    *
+    * @return type information where columns are fields of a row
+    */
+  def toRowType: TypeInformation[Row] = {
+    Types.ROW(getColumnNames, getTypes)
+  }
+
   override def toString: String = {
     val builder = new StringBuilder
     builder.append("root\n")

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index eb87c9d..e266a47 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api
 
 import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.formats.TableFormatFactory
 
 /**
   * Exception for all errors occurring during expression parsing.
@@ -142,6 +143,85 @@ case class CatalogAlreadyExistException(
 }
 
 /**
+  * Exception for not finding a [[org.apache.flink.table.formats.TableFormatFactory]] for the
+  * given properties.
+  *
+  * @param message message that indicates the current matching step
+  * @param factoryClass required factory class
+  * @param formatFactories all found factories
+  * @param properties properties that describe the table format
+  * @param cause the cause
+  */
+case class NoMatchingTableFormatException(
+      message: String,
+      factoryClass: Class[_],
+      formatFactories: Seq[TableFormatFactory[_]],
+      properties: Map[String, String],
+      cause: Throwable)
+    extends RuntimeException(
+      s"""Could not find a suitable table format factory for '${factoryClass.getName}' in
+        |the classpath.
+        |
+        |Reason: $message
+        |
+        |The following properties are requested:
+        |${DescriptorProperties.toString(properties)}
+        |
+        |The following format factories have been considered:
+        |${formatFactories.map(_.getClass.getName).mkString("\n")}
+        |""".stripMargin,
+      cause) {
+
+  def this(
+      message: String,
+      factoryClass: Class[_],
+      formatFactories: Seq[TableFormatFactory[_]],
+      properties: Map[String, String]) = {
+    this(message, factoryClass, formatFactories, properties, null)
+  }
+}
+
+/**
+  * Exception for finding more than one [[org.apache.flink.table.formats.TableFormatFactory]] for
+  * the given properties.
+  *
+  * @param matchingFormatFactories format factories that match the properties
+  * @param factoryClass required factory class
+  * @param formatFactories all found factories
+  * @param properties properties that describe the table format
+  * @param cause the cause
+  */
+case class AmbiguousTableFormatException(
+      matchingFormatFactories: Seq[TableFormatFactory[_]],
+      factoryClass: Class[_],
+      formatFactories: Seq[TableFormatFactory[_]],
+      properties: Map[String, String],
+      cause: Throwable)
+    extends RuntimeException(
+      s"""More than one suitable table format factory for '${factoryClass.getName}' could
+        |be found in the classpath.
+        |
+        |The following format factories match:
+        |${matchingFormatFactories.map(_.getClass.getName).mkString("\n")}
+        |
+        |The following properties are requested:
+        |${DescriptorProperties.toString(properties)}
+        |
+        |The following format factories have been considered:
+        |${formatFactories.map(_.getClass.getName).mkString("\n")}
+        |""".stripMargin,
+      cause) {
+
+  def this(
+      matchingFormatFactories: Seq[TableFormatFactory[_]],
+      factoryClass: Class[_],
+      formatFactories: Seq[TableFormatFactory[_]],
+      properties: Map[String, String]) = {
+    this(matchingFormatFactories, factoryClass, formatFactories, properties, null)
+  }
+}
+
+/**
   * Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the
   * given properties.
   *

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 4f5bd81..812b78a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -30,6 +30,7 @@ import org.apache.commons.codec.binary.Base64
 import org.apache.commons.lang.StringEscapeUtils
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
 import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema, toJava}
 import org.apache.flink.table.typeutils.TypeStringUtils
@@ -950,7 +951,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
       validateString(prefix + NAME, isOptional = false, minLen = 1)
     }
     val typeValidation = (prefix: String) => {
-      validateType(prefix + TYPE, isOptional = false)
+      validateType(prefix + TYPE, requireRow = false, isOptional = false)
     }
 
     validateFixedIndexedProperties(
@@ -998,13 +999,19 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
   /**
     * Validates a type property.
     */
-  def validateType(key: String, isOptional: Boolean): Unit = {
+  def validateType(key: String, requireRow: Boolean, isOptional: Boolean): Unit = {
     if (!properties.contains(key)) {
       if (!isOptional) {
         throw new ValidationException(s"Could not find required property '$key'.")
       }
     } else {
-      TypeStringUtils.readTypeInfo(properties(key)) // throws validation exceptions
+      // we don't validate the string but let the parser do the work for us
+      // it throws a validation exception
+      val info = TypeStringUtils.readTypeInfo(properties(key))
+      if (requireRow && !info.isInstanceOf[RowTypeInfo]) {
+        throw new ValidationException(
+          s"Row type information expected for '$key' but was: ${properties(key)}")
+      }
     }
   }
 
@@ -1079,7 +1086,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
     */
   private def put(key: String, value: String): Unit = {
     if (properties.contains(key)) {
-      throw new IllegalStateException("Property already present.")
+      throw new IllegalStateException("Property already present:" + key)
     }
     if (normalizeKeys) {
       properties.put(key.toLowerCase, value)
@@ -1263,7 +1270,7 @@ object DescriptorProperties {
   }
 
   def toString(keyOrValue: String): String = {
-    StringEscapeUtils.escapeJava(keyOrValue)
+    StringEscapeUtils.escapeJava(keyOrValue).replace("\\/", "/") // '/' must not be escaped
   }
 
   def toString(key: String, value: String): String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
index 301189a..3d44c24 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
@@ -34,6 +34,11 @@ class FormatDescriptorValidator extends DescriptorValidator {
 object FormatDescriptorValidator {
 
   /**
+    * Prefix for format-related properties.
+    */
+  val FORMAT = "format"
+
+  /**
     * Key for describing the type of the format. Usually used for factory discovery.
     */
   val FORMAT_TYPE = "format.type"

http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
index fdec820..160347e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
@@ -28,7 +28,11 @@ import scala.collection.JavaConverters._
 /**
   * Validator for [[Rowtime]].
   */
-class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
+class RowtimeValidator(
+    supportsSourceTimestamps: Boolean,
+    supportsSourceWatermarks: Boolean,
+    prefix: String = "")
+  extends DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
     val timestampExistingField = (_: String) => {
@@ -43,14 +47,21 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
         prefix + ROWTIME_TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1)
     }
 
-    properties.validateEnum(
-      prefix + ROWTIME_TIMESTAMPS_TYPE,
-      isOptional = false,
+    val timestampsValidation = if (supportsSourceTimestamps) {
       Map(
         ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
         ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> properties.noValidation(),
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom)
-      ).asJava
+        ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
+    } else {
+      Map(
+        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
+        ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
+    }
+
+    properties.validateEnum(
+      prefix + ROWTIME_TIMESTAMPS_TYPE,
+      isOptional = false,
+      timestampsValidation.asJava
     )
 
     val watermarkPeriodicBounded = (_: String) => {
@@ -65,15 +76,23 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
         prefix + ROWTIME_WATERMARKS_SERIALIZED, isOptional = false, minLen = 1)
     }
 
-    properties.validateEnum(
-      prefix + ROWTIME_WATERMARKS_TYPE,
-      isOptional = false,
+    val watermarksValidation = if (supportsSourceWatermarks) {
       Map(
         ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> properties.noValidation(),
         ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
         ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE -> properties.noValidation(),
-        ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom)
-      ).asJava
+        ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
+    } else {
+      Map(
+        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> properties.noValidation(),
+        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
+        ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
+    }
+
+    properties.validateEnum(
+      prefix + ROWTIME_WATERMARKS_TYPE,
+      isOptional = false,
+      watermarksValidation.asJava
     )
   }
 }
@@ -154,7 +173,7 @@ object RowtimeValidator {
         new ExistingField(field)
 
       case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE =>
-        new StreamRecordTimestamp
+        StreamRecordTimestamp.INSTANCE
 
       case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM =>
         val clazz = properties.getClass(