You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/15 09:24:51 UTC
[06/13] flink git commit: [FLINK-8558] [table] Add unified format
interfaces and separate formats from connectors
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java
new file mode 100644
index 0000000..4f5e218
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowFormatFactoryTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.table.descriptors.Avro;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.formats.DeserializationSchemaFactory;
+import org.apache.flink.table.formats.SerializationSchemaFactory;
+import org.apache.flink.table.formats.TableFormatFactoryService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link AvroRowFormatFactory}.
+ */
+public class AvroRowFormatFactoryTest extends TestLogger {
+
+ private static final Class<User> AVRO_SPECIFIC_RECORD = User.class;
+
+ private static final String AVRO_SCHEMA = User.getClassSchema().toString();
+
+ @Test
+ public void testRecordClass() {
+ final Map<String, String> properties = toMap(new Avro().recordClass(AVRO_SPECIFIC_RECORD));
+
+ testRecordClassDeserializationSchema(properties);
+
+ testRecordClassSerializationSchema(properties);
+ }
+
+ @Test
+ public void testAvroSchema() {
+ final Map<String, String> properties = toMap(new Avro().avroSchema(AVRO_SCHEMA));
+
+ testAvroSchemaSerializationSchema(properties);
+
+ testAvroSchemaDeserializationSchema(properties);
+ }
+
+ private void testRecordClassSerializationSchema(Map<String, String> properties) {
+ final DeserializationSchema<?> actual2 = TableFormatFactoryService
+ .find(DeserializationSchemaFactory.class, properties)
+ .createDeserializationSchema(properties);
+ final AvroRowDeserializationSchema expected2 = new AvroRowDeserializationSchema(AVRO_SPECIFIC_RECORD);
+ assertEquals(expected2, actual2);
+ }
+
+ private void testRecordClassDeserializationSchema(Map<String, String> properties) {
+ final SerializationSchema<?> actual1 = TableFormatFactoryService
+ .find(SerializationSchemaFactory.class, properties)
+ .createSerializationSchema(properties);
+ final SerializationSchema<?> expected1 = new AvroRowSerializationSchema(AVRO_SPECIFIC_RECORD);
+ assertEquals(expected1, actual1);
+ }
+
+ private void testAvroSchemaDeserializationSchema(Map<String, String> properties) {
+ final DeserializationSchema<?> actual2 = TableFormatFactoryService
+ .find(DeserializationSchemaFactory.class, properties)
+ .createDeserializationSchema(properties);
+ final AvroRowDeserializationSchema expected2 = new AvroRowDeserializationSchema(AVRO_SCHEMA);
+ assertEquals(expected2, actual2);
+ }
+
+ private void testAvroSchemaSerializationSchema(Map<String, String> properties) {
+ final SerializationSchema<?> actual1 = TableFormatFactoryService
+ .find(SerializationSchemaFactory.class, properties)
+ .createSerializationSchema(properties);
+ final SerializationSchema<?> expected1 = new AvroRowSerializationSchema(AVRO_SCHEMA);
+ assertEquals(expected1, actual1);
+ }
+
+ private static Map<String, String> toMap(Descriptor... desc) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ for (Descriptor d : desc) {
+ d.addProperties(descriptorProperties);
+ }
+ return descriptorProperties.asMap();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index df52851..dc8a116 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -38,6 +38,7 @@ import java.lang.reflect.Array;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.Objects;
/**
* Deserialization schema from JSON to Flink types.
@@ -84,7 +85,7 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
* @see <a href="http://json-schema.org/">http://json-schema.org/</a>
*/
public JsonRowDeserializationSchema(String jsonSchema) {
- this(JsonSchemaConverter.convert(jsonSchema));
+ this(JsonRowSchemaConverter.convert(jsonSchema));
}
@Override
@@ -118,6 +119,23 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row>
this.failOnMissingField = failOnMissingField;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final JsonRowDeserializationSchema that = (JsonRowDeserializationSchema) o;
+ return failOnMissingField == that.failOnMissingField && Objects.equals(typeInfo, that.typeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(typeInfo, failOnMissingField);
+ }
+
// --------------------------------------------------------------------------------------------
private Object convert(JsonNode node, TypeInformation<?> info) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
new file mode 100644
index 0000000..fd7bda6
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowFormatFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.descriptors.JsonValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.formats.DeserializationSchemaFactory;
+import org.apache.flink.table.formats.SerializationSchemaFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table format factory for providing configured instances of JSON-to-row {@link SerializationSchema}
+ * and {@link DeserializationSchema}.
+ */
+public class JsonRowFormatFactory implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {
+
+ @Override
+ public Map<String, String> requiredContext() {
+ final Map<String, String> context = new HashMap<>();
+ context.put(FormatDescriptorValidator.FORMAT_TYPE(), JsonValidator.FORMAT_TYPE_VALUE);
+ context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1");
+ return context;
+ }
+
+ @Override
+ public boolean supportsSchemaDerivation() {
+ return true;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ final List<String> properties = new ArrayList<>();
+ properties.add(JsonValidator.FORMAT_JSON_SCHEMA);
+ properties.add(JsonValidator.FORMAT_SCHEMA);
+ properties.add(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD);
+ properties.add(FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA());
+ properties.addAll(SchemaValidator.getSchemaDerivationKeys());
+ return properties;
+ }
+
+ @Override
+ public DeserializationSchema<Row> createDeserializationSchema(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = validateAndGetProperties(properties);
+
+ // create and configure
+ final JsonRowDeserializationSchema schema = new JsonRowDeserializationSchema(createTypeInformation(descriptorProperties));
+
+ descriptorProperties.getOptionalBoolean(JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD)
+ .ifPresent(schema::setFailOnMissingField);
+
+ return schema;
+ }
+
+ @Override
+ public SerializationSchema<Row> createSerializationSchema(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = validateAndGetProperties(properties);
+
+ // create and configure
+ return new JsonRowSerializationSchema(createTypeInformation(descriptorProperties));
+ }
+
+ private static DescriptorProperties validateAndGetProperties(Map<String, String> propertiesMap) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putProperties(propertiesMap);
+
+ // validate
+ new JsonValidator().validate(descriptorProperties);
+
+ return descriptorProperties;
+ }
+
+ private static TypeInformation<Row> createTypeInformation(DescriptorProperties descriptorProperties) {
+ if (descriptorProperties.containsKey(JsonValidator.FORMAT_SCHEMA)) {
+ return (RowTypeInfo) descriptorProperties.getType(JsonValidator.FORMAT_SCHEMA);
+ } else if (descriptorProperties.containsKey(JsonValidator.FORMAT_JSON_SCHEMA)) {
+ return JsonRowSchemaConverter.convert(descriptorProperties.getString(JsonValidator.FORMAT_JSON_SCHEMA));
+ } else {
+ return SchemaValidator.deriveFormatFields(descriptorProperties).toRowType();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
new file mode 100644
index 0000000..320ca1f
--- /dev/null
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSchemaConverter.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Converts a JSON schema into Flink's type information. It uses {@link Row} for representing
+ * objects and tuple arrays.
+ *
+ * <p>Note: This converter implements just a subset of the JSON schema specification.
+ * Union types (as well as "allOf", "anyOf", "not") are not supported yet. Simple
+ * references that link to a common definition in the document are supported. "oneOf" and
+ * arrays of types are only supported for specifying nullability.
+ *
+ * <p>This converter has been developed for JSON Schema draft-07 but also includes keywords of
+ * older drafts to be as compatible as possible.
+ */
+public final class JsonRowSchemaConverter {
+
+ private JsonRowSchemaConverter() {
+ // private
+ }
+
+ // see https://spacetelescope.github.io/understanding-json-schema/UnderstandingJSONSchema.pdf
+ private static final String PROPERTIES = "properties";
+ private static final String ADDITIONAL_PROPERTIES = "additionalProperties";
+ private static final String TYPE = "type";
+ private static final String FORMAT = "format";
+ private static final String CONTENT_ENCODING = "contentEncoding";
+ private static final String ITEMS = "items";
+ private static final String ADDITIONAL_ITEMS = "additionalItems";
+ private static final String REF = "$ref";
+ private static final String ALL_OF = "allOf";
+ private static final String ANY_OF = "anyOf";
+ private static final String NOT = "not";
+ private static final String ONE_OF = "oneOf";
+
+ // from https://tools.ietf.org/html/draft-zyp-json-schema-03#page-14
+ private static final String DISALLOW = "disallow";
+ private static final String EXTENDS = "extends";
+
+ private static final String TYPE_NULL = "null";
+ private static final String TYPE_BOOLEAN = "boolean";
+ private static final String TYPE_OBJECT = "object";
+ private static final String TYPE_ARRAY = "array";
+ private static final String TYPE_NUMBER = "number";
+ private static final String TYPE_INTEGER = "integer";
+ private static final String TYPE_STRING = "string";
+
+ private static final String FORMAT_DATE = "date";
+ private static final String FORMAT_TIME = "time";
+ private static final String FORMAT_DATE_TIME = "date-time";
+
+ private static final String CONTENT_ENCODING_BASE64 = "base64";
+
+ /**
+ * Converts a JSON schema into Flink's type information. Throws an exception if the schema
+ * cannot converted because of loss of precision or too flexible schema.
+ *
+ * <p>The converter can resolve simple schema references to solve those cases where entities
+ * are defined at the beginning and then used throughout a document.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> TypeInformation<T> convert(String jsonSchema) {
+ Preconditions.checkNotNull(jsonSchema, "JSON schema");
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.getFactory()
+ .enable(JsonParser.Feature.ALLOW_COMMENTS)
+ .enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
+ .enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
+ final JsonNode node;
+ try {
+ node = mapper.readTree(jsonSchema);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ "Invalid JSON schema.", e);
+ }
+ return (TypeInformation<T>) convertType("<root>", node, node);
+ }
+
+ private static TypeInformation<?> convertType(String location, JsonNode node, JsonNode root) {
+ // we use a set here to unify types (e.g. types that just add metadata such as 'multipleOf')
+ final Set<TypeInformation<?>> typeSet = new HashSet<>();
+
+ // search for ref
+ final Optional<JsonNode> ref;
+ if (node.has(REF) && node.get(REF).isTextual()) {
+ // try a simple ref resolver to solve those cases where entities are defined at
+ // the beginning and then used throughout a document
+ ref = Optional.of(resolveReference(node.get(REF).asText(), node, root));
+ } else {
+ ref = Optional.empty();
+ }
+
+ // use TYPE of this node
+ if (node.has(TYPE)) {
+ final JsonNode typeNode = node.get(TYPE);
+
+ List<String> types = new ArrayList<>();
+ // array of types
+ if (typeNode.isArray()) {
+ final Iterator<JsonNode> elements = typeNode.elements();
+ while (elements.hasNext()) {
+ types.add(elements.next().asText());
+ }
+ }
+ // single type
+ else if (typeNode.isTextual()) {
+ types.add(typeNode.asText());
+ }
+
+ for (String type : types) {
+ // set field type
+ switch (type) {
+ case TYPE_NULL:
+ typeSet.add(Types.VOID);
+ break;
+ case TYPE_BOOLEAN:
+ typeSet.add(Types.BOOLEAN);
+ break;
+ case TYPE_STRING:
+ if (node.has(FORMAT)) {
+ typeSet.add(convertStringFormat(location, node.get(FORMAT)));
+ } else if (node.has(CONTENT_ENCODING)) {
+ typeSet.add(convertStringEncoding(location, node.get(CONTENT_ENCODING)));
+ } else {
+ typeSet.add(Types.STRING);
+ }
+ break;
+ case TYPE_NUMBER:
+ typeSet.add(Types.BIG_DEC);
+ break;
+ case TYPE_INTEGER:
+ // use BigDecimal for easier interoperability
+ // without affecting the correctness of the result
+ typeSet.add(Types.BIG_DEC);
+ break;
+ case TYPE_OBJECT:
+ typeSet.add(convertObject(location, node, root));
+ break;
+ case TYPE_ARRAY:
+ typeSet.add(convertArray(location, node, root));
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported type '" + node.get(TYPE).asText() + "' in node: " + location);
+ }
+ }
+ }
+ // use TYPE of reference as fallback if present
+ else {
+ ref.filter(r -> r.has(TYPE)).ifPresent(r -> typeSet.add(convertType(node.get(REF).asText(), r, root)));
+ }
+
+ // simple interpretation of ONE_OF for supporting "object or null"
+ if (node.has(ONE_OF) && node.get(ONE_OF).isArray()) {
+ final TypeInformation<?>[] types = convertTypes(location + '/' + ONE_OF, node.get(ONE_OF), root);
+ typeSet.addAll(Arrays.asList(types));
+ }
+ // use ONE_OF of reference as fallback
+ else if (ref.isPresent() && ref.get().has(ONE_OF) && ref.get().get(ONE_OF).isArray()) {
+ final TypeInformation<?>[] types = convertTypes(node.get(REF).asText() + '/' + ONE_OF, ref.get().get(ONE_OF), root);
+ typeSet.addAll(Arrays.asList(types));
+ }
+
+ // validate no union types or extending
+ if (node.has(ALL_OF) || node.has(ANY_OF) || node.has(NOT) || node.has(EXTENDS) || node.has(DISALLOW)) {
+ throw new IllegalArgumentException(
+ "Union types are such as '" + ALL_OF + "', '" + ANY_OF + "' etc. " +
+ "and extending are not supported yet.");
+ }
+
+ // only a type (with null) is supported yet
+ final List<TypeInformation<?>> types = new ArrayList<>(typeSet);
+ if (types.size() == 0) {
+ throw new IllegalArgumentException("No type could be found in node:" + location);
+ } else if (types.size() > 2 || (types.size() == 2 && !types.contains(Types.VOID))) {
+ throw new IllegalArgumentException(
+ "Union types with more than just a null type are not supported yet.");
+ }
+
+ // return the first non-void type or void
+ if (types.size() == 2 && types.get(0) == Types.VOID) {
+ return types.get(1);
+ } else {
+ return types.get(0);
+ }
+ }
+
+ private static TypeInformation<Row> convertObject(String location, JsonNode node, JsonNode root) {
+ // validate properties
+ if (!node.has(PROPERTIES)) {
+ return Types.ROW();
+ }
+ if (!node.isObject()) {
+ throw new IllegalArgumentException(
+ "Invalid '" + PROPERTIES + "' property for object type in node: " + location);
+ }
+ final JsonNode props = node.get(PROPERTIES);
+ final String[] names = new String[props.size()];
+ final TypeInformation<?>[] types = new TypeInformation[props.size()];
+
+ final Iterator<Map.Entry<String, JsonNode>> fieldIter = props.fields();
+ int i = 0;
+ while (fieldIter.hasNext()) {
+ final Map.Entry<String, JsonNode> subNode = fieldIter.next();
+
+ // set field name
+ names[i] = subNode.getKey();
+
+ // set type
+ types[i] = convertType(location + '/' + subNode.getKey(), subNode.getValue(), root);
+
+ i++;
+ }
+
+ // validate that object does not contain additional properties
+ if (node.has(ADDITIONAL_PROPERTIES) && node.get(ADDITIONAL_PROPERTIES).isBoolean() &&
+ node.get(ADDITIONAL_PROPERTIES).asBoolean()) {
+ throw new IllegalArgumentException(
+ "An object must not allow additional properties in node: " + location);
+ }
+
+ return Types.ROW_NAMED(names, types);
+ }
+
+ private static TypeInformation<?> convertArray(String location, JsonNode node, JsonNode root) {
+ // validate items
+ if (!node.has(ITEMS)) {
+ throw new IllegalArgumentException(
+ "Arrays must specify an '" + ITEMS + "' property in node: " + location);
+ }
+ final JsonNode items = node.get(ITEMS);
+
+ // list (translated to object array)
+ if (items.isObject()) {
+ final TypeInformation<?> elementType = convertType(
+ location + '/' + ITEMS,
+ items,
+ root);
+ // result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
+ return Types.OBJECT_ARRAY(elementType);
+ }
+ // tuple (translated to row)
+ else if (items.isArray()) {
+ final TypeInformation<?>[] types = convertTypes(location + '/' + ITEMS, items, root);
+
+ // validate that array does not contain additional items
+ if (node.has(ADDITIONAL_ITEMS) && node.get(ADDITIONAL_ITEMS).isBoolean() &&
+ node.get(ADDITIONAL_ITEMS).asBoolean()) {
+ throw new IllegalArgumentException(
+ "An array tuple must not allow additional items in node: " + location);
+ }
+
+ return Types.ROW(types);
+ }
+ throw new IllegalArgumentException(
+ "Invalid type for '" + ITEMS + "' property in node: " + location);
+ }
+
+ private static TypeInformation<?> convertStringFormat(String location, JsonNode node) {
+ if (!node.isTextual()) {
+ throw new IllegalArgumentException("Invalid '" + FORMAT + "' property in node: " + location);
+ }
+
+ switch (node.asText()) {
+ case FORMAT_DATE:
+ return Types.SQL_DATE;
+ case FORMAT_TIME:
+ return Types.SQL_TIME;
+ case FORMAT_DATE_TIME:
+ return Types.SQL_TIMESTAMP;
+ default:
+ return Types.STRING; // unlikely that we will support other formats in the future
+ }
+ }
+
+ private static TypeInformation<?> convertStringEncoding(String location, JsonNode node) {
+ if (!node.isTextual()) {
+ throw new IllegalArgumentException("Invalid '" + CONTENT_ENCODING + "' property in node: " + location);
+ }
+
+ // "If the instance value is a string, this property defines that the string SHOULD
+ // be interpreted as binary data and decoded using the encoding named by this property."
+
+ switch (node.asText()) {
+ case CONTENT_ENCODING_BASE64:
+ return Types.PRIMITIVE_ARRAY(Types.BYTE);
+ default:
+ // we fail hard here:
+ // this gives us the chance to support more encodings in the future without problems
+ // of backwards compatibility
+ throw new IllegalArgumentException("Invalid encoding '" + node.asText() + "' in node: " + location);
+ }
+ }
+
+ private static JsonNode resolveReference(String ref, JsonNode origin, JsonNode root) {
+ if (!ref.startsWith("#")) {
+ throw new IllegalArgumentException("Only JSON schemes with simple references " +
+ "(one indirection in the same document) are supported yet. But was: " + ref);
+ }
+ final String path = ref.substring(1);
+ final JsonNode foundNode = root.at(path);
+ if (foundNode.isMissingNode()) {
+ throw new IllegalArgumentException("Could not find reference: " + ref);
+ }
+ // prevent obvious cyclic references
+ if (foundNode == origin) {
+ throw new IllegalArgumentException("Cyclic references are not supported:" + ref);
+ }
+ return foundNode;
+ }
+
+ private static TypeInformation<?>[] convertTypes(String location, JsonNode arrayNode, JsonNode root) {
+ final TypeInformation<?>[] types = new TypeInformation[arrayNode.size()];
+ final Iterator<JsonNode> elements = arrayNode.elements();
+ int i = 0;
+ while (elements.hasNext()) {
+ final TypeInformation<?> elementType = convertType(
+ location + '[' + i + ']',
+ elements.next(),
+ root);
+ types[i] = elementType;
+ i += 1;
+ }
+ return types;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index 8fee6a4..d942062 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -39,6 +39,7 @@ import java.math.BigInteger;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
+import java.util.Objects;
/**
* Serialization schema that serializes an object of Flink types into a JSON bytes.
@@ -89,7 +90,7 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
* @see <a href="http://json-schema.org/">http://json-schema.org/</a>
*/
public JsonRowSerializationSchema(String jsonSchema) {
- this(JsonSchemaConverter.convert(jsonSchema));
+ this(JsonRowSchemaConverter.convert(jsonSchema));
}
@Override
@@ -107,6 +108,23 @@ public class JsonRowSerializationSchema implements SerializationSchema<Row> {
}
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final JsonRowSerializationSchema that = (JsonRowSerializationSchema) o;
+ return Objects.equals(typeInfo, that.typeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(typeInfo);
+ }
+
// --------------------------------------------------------------------------------------------
private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
deleted file mode 100644
index 7a001f6..0000000
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * Converts a JSON schema into Flink's type information. It uses {@link Row} for representing
- * objects and tuple arrays.
- *
- * <p>Note: This converter implements just a subset of the JSON schema specification.
- * Union types (as well as "allOf", "anyOf", "not") are not supported yet. Simple
- * references that link to a common definition in the document are supported. "oneOf" and
- * arrays of types are only supported for specifying nullability.
- *
- * <p>This converter has been developed for JSON Schema draft-07 but also includes keywords of
- * older drafts to be as compatible as possible.
- */
-public final class JsonSchemaConverter {
-
- private JsonSchemaConverter() {
- // private
- }
-
- // see https://spacetelescope.github.io/understanding-json-schema/UnderstandingJSONSchema.pdf
- private static final String PROPERTIES = "properties";
- private static final String ADDITIONAL_PROPERTIES = "additionalProperties";
- private static final String TYPE = "type";
- private static final String FORMAT = "format";
- private static final String CONTENT_ENCODING = "contentEncoding";
- private static final String ITEMS = "items";
- private static final String ADDITIONAL_ITEMS = "additionalItems";
- private static final String REF = "$ref";
- private static final String ALL_OF = "allOf";
- private static final String ANY_OF = "anyOf";
- private static final String NOT = "not";
- private static final String ONE_OF = "oneOf";
-
- // from https://tools.ietf.org/html/draft-zyp-json-schema-03#page-14
- private static final String DISALLOW = "disallow";
- private static final String EXTENDS = "extends";
-
- private static final String TYPE_NULL = "null";
- private static final String TYPE_BOOLEAN = "boolean";
- private static final String TYPE_OBJECT = "object";
- private static final String TYPE_ARRAY = "array";
- private static final String TYPE_NUMBER = "number";
- private static final String TYPE_INTEGER = "integer";
- private static final String TYPE_STRING = "string";
-
- private static final String FORMAT_DATE = "date";
- private static final String FORMAT_TIME = "time";
- private static final String FORMAT_DATE_TIME = "date-time";
-
- private static final String CONTENT_ENCODING_BASE64 = "base64";
-
- /**
- * Converts a JSON schema into Flink's type information. Throws an exception if the schema
- * cannot converted because of loss of precision or too flexible schema.
- *
- * <p>The converter can resolve simple schema references to solve those cases where entities
- * are defined at the beginning and then used throughout a document.
- */
- @SuppressWarnings("unchecked")
- public static <T> TypeInformation<T> convert(String jsonSchema) {
- Preconditions.checkNotNull(jsonSchema, "JSON schema");
- final ObjectMapper mapper = new ObjectMapper();
- mapper.getFactory()
- .enable(JsonParser.Feature.ALLOW_COMMENTS)
- .enable(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES)
- .enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
- final JsonNode node;
- try {
- node = mapper.readTree(jsonSchema);
- } catch (IOException e) {
- throw new IllegalArgumentException(
- "Invalid JSON schema.", e);
- }
- return (TypeInformation<T>) convertType("<root>", node, node);
- }
-
- private static TypeInformation<?> convertType(String location, JsonNode node, JsonNode root) {
- // we use a set here to unify types (e.g. types that just add metadata such as 'multipleOf')
- final Set<TypeInformation<?>> typeSet = new HashSet<>();
-
- // search for ref
- final Optional<JsonNode> ref;
- if (node.has(REF) && node.get(REF).isTextual()) {
- // try a simple ref resolver to solve those cases where entities are defined at
- // the beginning and then used throughout a document
- ref = Optional.of(resolveReference(node.get(REF).asText(), node, root));
- } else {
- ref = Optional.empty();
- }
-
- // use TYPE of this node
- if (node.has(TYPE)) {
- final JsonNode typeNode = node.get(TYPE);
-
- List<String> types = new ArrayList<>();
- // array of types
- if (typeNode.isArray()) {
- final Iterator<JsonNode> elements = typeNode.elements();
- while (elements.hasNext()) {
- types.add(elements.next().asText());
- }
- }
- // single type
- else if (typeNode.isTextual()) {
- types.add(typeNode.asText());
- }
-
- for (String type : types) {
- // set field type
- switch (type) {
- case TYPE_NULL:
- typeSet.add(Types.VOID);
- break;
- case TYPE_BOOLEAN:
- typeSet.add(Types.BOOLEAN);
- break;
- case TYPE_STRING:
- if (node.has(FORMAT)) {
- typeSet.add(convertStringFormat(location, node.get(FORMAT)));
- } else if (node.has(CONTENT_ENCODING)) {
- typeSet.add(convertStringEncoding(location, node.get(CONTENT_ENCODING)));
- } else {
- typeSet.add(Types.STRING);
- }
- break;
- case TYPE_NUMBER:
- typeSet.add(Types.BIG_DEC);
- break;
- case TYPE_INTEGER:
- // use BigDecimal for easier interoperability
- // without affecting the correctness of the result
- typeSet.add(Types.BIG_DEC);
- break;
- case TYPE_OBJECT:
- typeSet.add(convertObject(location, node, root));
- break;
- case TYPE_ARRAY:
- typeSet.add(convertArray(location, node, root));
- break;
- default:
- throw new IllegalArgumentException(
- "Unsupported type '" + node.get(TYPE).asText() + "' in node: " + location);
- }
- }
- }
- // use TYPE of reference as fallback if present
- else {
- ref.filter(r -> r.has(TYPE)).ifPresent(r -> typeSet.add(convertType(node.get(REF).asText(), r, root)));
- }
-
- // simple interpretation of ONE_OF for supporting "object or null"
- if (node.has(ONE_OF) && node.get(ONE_OF).isArray()) {
- final TypeInformation<?>[] types = convertTypes(location + '/' + ONE_OF, node.get(ONE_OF), root);
- typeSet.addAll(Arrays.asList(types));
- }
- // use ONE_OF of reference as fallback
- else if (ref.isPresent() && ref.get().has(ONE_OF) && ref.get().get(ONE_OF).isArray()) {
- final TypeInformation<?>[] types = convertTypes(node.get(REF).asText() + '/' + ONE_OF, ref.get().get(ONE_OF), root);
- typeSet.addAll(Arrays.asList(types));
- }
-
- // validate no union types or extending
- if (node.has(ALL_OF) || node.has(ANY_OF) || node.has(NOT) || node.has(EXTENDS) || node.has(DISALLOW)) {
- throw new IllegalArgumentException(
- "Union types are such as '" + ALL_OF + "', '" + ANY_OF + "' etc. " +
- "and extending are not supported yet.");
- }
-
- // only a type (with null) is supported yet
- final List<TypeInformation<?>> types = new ArrayList<>(typeSet);
- if (types.size() == 0) {
- throw new IllegalArgumentException("No type could be found in node:" + location);
- } else if (types.size() > 2 || (types.size() == 2 && !types.contains(Types.VOID))) {
- throw new IllegalArgumentException(
- "Union types with more than just a null type are not supported yet.");
- }
-
- // return the first non-void type or void
- if (types.size() == 2 && types.get(0) == Types.VOID) {
- return types.get(1);
- } else {
- return types.get(0);
- }
- }
-
- private static TypeInformation<Row> convertObject(String location, JsonNode node, JsonNode root) {
- // validate properties
- if (!node.has(PROPERTIES)) {
- return Types.ROW();
- }
- if (!node.isObject()) {
- throw new IllegalArgumentException(
- "Invalid '" + PROPERTIES + "' property for object type in node: " + location);
- }
- final JsonNode props = node.get(PROPERTIES);
- final String[] names = new String[props.size()];
- final TypeInformation<?>[] types = new TypeInformation[props.size()];
-
- final Iterator<Map.Entry<String, JsonNode>> fieldIter = props.fields();
- int i = 0;
- while (fieldIter.hasNext()) {
- final Map.Entry<String, JsonNode> subNode = fieldIter.next();
-
- // set field name
- names[i] = subNode.getKey();
-
- // set type
- types[i] = convertType(location + '/' + subNode.getKey(), subNode.getValue(), root);
-
- i++;
- }
-
- // validate that object does not contain additional properties
- if (node.has(ADDITIONAL_PROPERTIES) && node.get(ADDITIONAL_PROPERTIES).isBoolean() &&
- node.get(ADDITIONAL_PROPERTIES).asBoolean()) {
- throw new IllegalArgumentException(
- "An object must not allow additional properties in node: " + location);
- }
-
- return Types.ROW_NAMED(names, types);
- }
-
- private static TypeInformation<?> convertArray(String location, JsonNode node, JsonNode root) {
- // validate items
- if (!node.has(ITEMS)) {
- throw new IllegalArgumentException(
- "Arrays must specify an '" + ITEMS + "' property in node: " + location);
- }
- final JsonNode items = node.get(ITEMS);
-
- // list (translated to object array)
- if (items.isObject()) {
- final TypeInformation<?> elementType = convertType(
- location + '/' + ITEMS,
- items,
- root);
- // result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings
- return Types.OBJECT_ARRAY(elementType);
- }
- // tuple (translated to row)
- else if (items.isArray()) {
- final TypeInformation<?>[] types = convertTypes(location + '/' + ITEMS, items, root);
-
- // validate that array does not contain additional items
- if (node.has(ADDITIONAL_ITEMS) && node.get(ADDITIONAL_ITEMS).isBoolean() &&
- node.get(ADDITIONAL_ITEMS).asBoolean()) {
- throw new IllegalArgumentException(
- "An array tuple must not allow additional items in node: " + location);
- }
-
- return Types.ROW(types);
- }
- throw new IllegalArgumentException(
- "Invalid type for '" + ITEMS + "' property in node: " + location);
- }
-
- private static TypeInformation<?> convertStringFormat(String location, JsonNode node) {
- if (!node.isTextual()) {
- throw new IllegalArgumentException("Invalid '" + FORMAT + "' property in node: " + location);
- }
-
- switch (node.asText()) {
- case FORMAT_DATE:
- return Types.SQL_DATE;
- case FORMAT_TIME:
- return Types.SQL_TIME;
- case FORMAT_DATE_TIME:
- return Types.SQL_TIMESTAMP;
- default:
- return Types.STRING; // unlikely that we will support other formats in the future
- }
- }
-
- private static TypeInformation<?> convertStringEncoding(String location, JsonNode node) {
- if (!node.isTextual()) {
- throw new IllegalArgumentException("Invalid '" + CONTENT_ENCODING + "' property in node: " + location);
- }
-
- // "If the instance value is a string, this property defines that the string SHOULD
- // be interpreted as binary data and decoded using the encoding named by this property."
-
- switch (node.asText()) {
- case CONTENT_ENCODING_BASE64:
- return Types.PRIMITIVE_ARRAY(Types.BYTE);
- default:
- // we fail hard here:
- // this gives us the chance to support more encodings in the future without problems
- // of backwards compatibility
- throw new IllegalArgumentException("Invalid encoding '" + node.asText() + "' in node: " + location);
- }
- }
-
- private static JsonNode resolveReference(String ref, JsonNode origin, JsonNode root) {
- if (!ref.startsWith("#")) {
- throw new IllegalArgumentException("Only JSON schemes with simple references " +
- "(one indirection in the same document) are supported yet. But was: " + ref);
- }
- final String path = ref.substring(1);
- final JsonNode foundNode = root.at(path);
- if (foundNode.isMissingNode()) {
- throw new IllegalArgumentException("Could not find reference: " + ref);
- }
- // prevent obvious cyclic references
- if (foundNode == origin) {
- throw new IllegalArgumentException("Cyclic references are not supported:" + ref);
- }
- return foundNode;
- }
-
- private static TypeInformation<?>[] convertTypes(String location, JsonNode arrayNode, JsonNode root) {
- final TypeInformation<?>[] types = new TypeInformation[arrayNode.size()];
- final Iterator<JsonNode> elements = arrayNode.elements();
- int i = 0;
- while (elements.hasNext()) {
- final TypeInformation<?> elementType = convertType(
- location + '[' + i + ']',
- elements.next(),
- root);
- types[i] = elementType;
- i += 1;
- }
- return types;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
index 9c12191..035f05f 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.typeutils.TypeStringUtils;
+import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
@@ -81,7 +82,7 @@ public class Json extends FormatDescriptor {
*
* @param schemaType type information that describes the schema
*/
- public Json schema(TypeInformation<?> schemaType) {
+ public Json schema(TypeInformation<Row> schemaType) {
Preconditions.checkNotNull(schemaType);
this.schema = TypeStringUtils.writeTypeInfo(schemaType);
this.jsonSchema = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
index fea7cf5..49e1abc 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
@@ -45,7 +45,7 @@ public class JsonValidator extends FormatDescriptorValidator {
} else if (!deriveSchema && !hasSchema && !hasSchemaString) {
throw new ValidationException("A definition of a schema or JSON schema is required.");
} else if (hasSchema) {
- properties.validateType(FORMAT_SCHEMA, false);
+ properties.validateType(FORMAT_SCHEMA, true, false);
} else if (hasSchemaString) {
properties.validateString(FORMAT_JSON_SCHEMA, false, 1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
new file mode 100644
index 0000000..aec5846
--- /dev/null
+++ b/flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.formats.TableFormatFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.json.JsonRowFormatFactory
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
new file mode 100644
index 0000000..d763b90
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowFormatFactoryTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.descriptors.Descriptor;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Json;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.formats.DeserializationSchemaFactory;
+import org.apache.flink.table.formats.SerializationSchemaFactory;
+import org.apache.flink.table.formats.TableFormatFactoryService;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link JsonRowFormatFactory}.
+ */
+public class JsonRowFormatFactoryTest extends TestLogger {
+
+ private static final String JSON_SCHEMA =
+ "{" +
+ " 'title': 'Fruit'," +
+ " 'type': 'object'," +
+ " 'properties': {" +
+ " 'name': {" +
+ " 'type': 'string'" +
+ " }," +
+ " 'count': {" +
+ " 'type': 'integer'" +
+ " }," +
+ " 'time': {" +
+ " 'description': 'row time'," +
+ " 'type': 'string'," +
+ " 'format': 'date-time'" +
+ " }" +
+ " }," +
+ " 'required': ['name', 'count', 'time']" +
+ "}";
+
+ private static final TypeInformation<Row> SCHEMA = Types.ROW(
+ new String[]{"field1", "field2"},
+ new TypeInformation[]{Types.BOOLEAN(), Types.INT()});
+
+ @Test
+ public void testSchema() {
+ final Map<String, String> properties = toMap(
+ new Json()
+ .schema(SCHEMA)
+ .failOnMissingField(false));
+
+ testSchemaSerializationSchema(properties);
+
+ testSchemaDeserializationSchema(properties);
+ }
+
+ @Test
+ public void testJsonSchema() {
+ final Map<String, String> properties = toMap(
+ new Json()
+ .jsonSchema(JSON_SCHEMA)
+ .failOnMissingField(true));
+
+ testJsonSchemaSerializationSchema(properties);
+
+ testJsonSchemaDeserializationSchema(properties);
+ }
+
+ @Test
+ public void testSchemaDerivation() {
+ final Map<String, String> properties = toMap(
+ new Schema()
+ .field("field1", Types.BOOLEAN())
+ .field("field2", Types.INT())
+ .field("proctime", Types.SQL_TIMESTAMP()).proctime(),
+ new Json()
+ .deriveSchema());
+
+ testSchemaSerializationSchema(properties);
+
+ testSchemaDeserializationSchema(properties);
+ }
+
+ private void testSchemaDeserializationSchema(Map<String, String> properties) {
+ final DeserializationSchema<?> actual2 = TableFormatFactoryService
+ .find(DeserializationSchemaFactory.class, properties)
+ .createDeserializationSchema(properties);
+ final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema(SCHEMA);
+ expected2.setFailOnMissingField(false);
+ assertEquals(expected2, actual2);
+ }
+
+ private void testSchemaSerializationSchema(Map<String, String> properties) {
+ final SerializationSchema<?> actual1 = TableFormatFactoryService
+ .find(SerializationSchemaFactory.class, properties)
+ .createSerializationSchema(properties);
+ final SerializationSchema expected1 = new JsonRowSerializationSchema(SCHEMA);
+ assertEquals(expected1, actual1);
+ }
+
+ private void testJsonSchemaDeserializationSchema(Map<String, String> properties) {
+ final DeserializationSchema<?> actual2 = TableFormatFactoryService
+ .find(DeserializationSchemaFactory.class, properties)
+ .createDeserializationSchema(properties);
+ final JsonRowDeserializationSchema expected2 = new JsonRowDeserializationSchema(JSON_SCHEMA);
+ expected2.setFailOnMissingField(true);
+ assertEquals(expected2, actual2);
+ }
+
+ private void testJsonSchemaSerializationSchema(Map<String, String> properties) {
+ final SerializationSchema<?> actual1 = TableFormatFactoryService
+ .find(SerializationSchemaFactory.class, properties)
+ .createSerializationSchema(properties);
+ final SerializationSchema<?> expected1 = new JsonRowSerializationSchema(JSON_SCHEMA);
+ assertEquals(expected1, actual1);
+ }
+
+ private static Map<String, String> toMap(Descriptor... desc) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ for (Descriptor d : desc) {
+ d.addProperties(descriptorProperties);
+ }
+ return descriptorProperties.asMap();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSchemaConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSchemaConverterTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSchemaConverterTest.java
new file mode 100644
index 0000000..1af45f4
--- /dev/null
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSchemaConverterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JsonRowSchemaConverter}.
+ */
+public class JsonRowSchemaConverterTest {
+
+ @Test
+ public void testComplexSchema() throws Exception {
+ final URL url = getClass().getClassLoader().getResource("complex-schema.json");
+ Objects.requireNonNull(url);
+ final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
+ final TypeInformation<?> result = JsonRowSchemaConverter.convert(schema);
+
+ final TypeInformation<?> expected = Types.ROW_NAMED(
+ new String[] {"fn", "familyName", "additionalName", "tuples", "honorificPrefix", "url",
+ "email", "tel", "sound", "org"},
+ Types.STRING, Types.STRING, Types.BOOLEAN, Types.ROW(Types.BIG_DEC, Types.STRING, Types.STRING, Types.STRING),
+ Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.ROW_NAMED(new String[] {"type", "value"}, Types.STRING, Types.STRING),
+ Types.ROW_NAMED(new String[] {"type", "value"}, Types.BIG_DEC, Types.STRING), Types.VOID,
+ Types.ROW_NAMED(new String[] {"organizationUnit"}, Types.ROW()));
+
+ assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReferenceSchema() throws Exception {
+ final URL url = getClass().getClassLoader().getResource("reference-schema.json");
+ Objects.requireNonNull(url);
+ final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
+ final TypeInformation<?> result = JsonRowSchemaConverter.convert(schema);
+
+ final TypeInformation<?> expected = Types.ROW_NAMED(
+ new String[] {"billing_address", "shipping_address", "optional_address"},
+ Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING),
+ Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING),
+ Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING));
+
+ assertEquals(expected, result);
+ }
+
+ @Test
+ public void testAtomicType() {
+ final TypeInformation<?> result = JsonRowSchemaConverter.convert("{ type: 'number' }");
+
+ assertEquals(Types.BIG_DEC, result);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMissingType() {
+ JsonRowSchemaConverter.convert("{ }");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongType() {
+ JsonRowSchemaConverter.convert("{ type: 'whatever' }");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testArrayWithAdditionalItems() {
+ JsonRowSchemaConverter.convert("{ type: 'array', items: [{type: 'integer'}], additionalItems: true }");
+ }
+
+ @Test
+ public void testMissingProperties() {
+ final TypeInformation<?> result = JsonRowSchemaConverter.convert("{ type: 'object' }");
+
+ assertEquals(Types.ROW(), result);
+ }
+
+ @Test
+ public void testNullUnionTypes() {
+ final TypeInformation<?> result = JsonRowSchemaConverter.convert("{ type: ['string', 'null'] }");
+
+ assertEquals(Types.STRING, result);
+ }
+
+ @Test
+ public void testTimestamp() {
+ final TypeInformation<?> result = JsonRowSchemaConverter.convert("{ type: 'string', format: 'date-time' }");
+
+ assertEquals(Types.SQL_TIMESTAMP, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
index 94a05b3..e2410d4 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowSerializationSchemaTest.java
@@ -111,7 +111,7 @@ public class JsonRowSerializationSchemaTest {
@Test
public void testSchema() throws IOException {
- final TypeInformation<Row> rowSchema = JsonSchemaConverter.convert(
+ final TypeInformation<Row> rowSchema = JsonRowSchemaConverter.convert(
"{" +
" type: 'object'," +
" properties: {" +
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSchemaConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSchemaConverterTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSchemaConverterTest.java
deleted file mode 100644
index 7cf3b7c..0000000
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonSchemaConverterTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.util.FileUtils;
-
-import org.junit.Test;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Objects;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for {@link JsonSchemaConverter}.
- */
-public class JsonSchemaConverterTest {
-
- @Test
- public void testComplexSchema() throws Exception {
- final URL url = getClass().getClassLoader().getResource("complex-schema.json");
- Objects.requireNonNull(url);
- final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
- final TypeInformation<?> result = JsonSchemaConverter.convert(schema);
-
- final TypeInformation<?> expected = Types.ROW_NAMED(
- new String[] {"fn", "familyName", "additionalName", "tuples", "honorificPrefix", "url",
- "email", "tel", "sound", "org"},
- Types.STRING, Types.STRING, Types.BOOLEAN, Types.ROW(Types.BIG_DEC, Types.STRING, Types.STRING, Types.STRING),
- Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.ROW_NAMED(new String[] {"type", "value"}, Types.STRING, Types.STRING),
- Types.ROW_NAMED(new String[] {"type", "value"}, Types.BIG_DEC, Types.STRING), Types.VOID,
- Types.ROW_NAMED(new String[] {"organizationUnit"}, Types.ROW()));
-
- assertEquals(expected, result);
- }
-
- @Test
- public void testReferenceSchema() throws Exception {
- final URL url = getClass().getClassLoader().getResource("reference-schema.json");
- Objects.requireNonNull(url);
- final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
- final TypeInformation<?> result = JsonSchemaConverter.convert(schema);
-
- final TypeInformation<?> expected = Types.ROW_NAMED(
- new String[] {"billing_address", "shipping_address", "optional_address"},
- Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING),
- Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING),
- Types.ROW_NAMED(new String[] {"street_address", "city", "state"}, Types.STRING, Types.STRING, Types.STRING));
-
- assertEquals(expected, result);
- }
-
- @Test
- public void testAtomicType() {
- final TypeInformation<?> result = JsonSchemaConverter.convert("{ type: 'number' }");
-
- assertEquals(Types.BIG_DEC, result);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testMissingType() {
- JsonSchemaConverter.convert("{ }");
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testWrongType() {
- JsonSchemaConverter.convert("{ type: 'whatever' }");
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testArrayWithAdditionalItems() {
- JsonSchemaConverter.convert("{ type: 'array', items: [{type: 'integer'}], additionalItems: true }");
- }
-
- @Test
- public void testMissingProperties() {
- final TypeInformation<?> result = JsonSchemaConverter.convert("{ type: 'object' }");
-
- assertEquals(Types.ROW(), result);
- }
-
- @Test
- public void testNullUnionTypes() {
- final TypeInformation<?> result = JsonSchemaConverter.convert("{ type: ['string', 'null'] }");
-
- assertEquals(Types.STRING, result);
- }
-
- @Test
- public void testTimestamp() {
- final TypeInformation<?> result = JsonSchemaConverter.convert("{ type: 'string', format: 'date-time' }");
-
- assertEquals(Types.SQL_TIMESTAMP, result);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 05255fd..043a345 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.table.descriptors.{BatchTableSourceDescriptor, ConnectorDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.{Expression, TimeAttribute}
import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -110,16 +111,44 @@ abstract class BatchTableEnvironment(
}
}
-// TODO expose this once we have enough table source factories that can deal with it
-// /**
-// * Creates a table from a descriptor that describes the source connector, source encoding,
-// * the resulting table schema, and other properties.
-// *
-// * @param connectorDescriptor connector descriptor describing the source of the table
-// */
-// def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
-// new BatchTableSourceDescriptor(this, connectorDescriptor)
-// }
+ /**
+ * Creates a table from a descriptor that describes the source connector, the source format,
+ * the resulting table schema, and other properties.
+ *
+ * Descriptors allow for declaring communication to external systems in an
+ * implementation-agnostic way. The classpath is scanned for connectors and matching connectors
+ * are configured accordingly.
+ *
+ * The following example shows how to read from a Kafka connector using a JSON format and
+ * creating a table:
+ *
+ * {{{
+ *
+ * tableEnv
+ * .from(
+ * new Kafka()
+ * .version("0.11")
+ * .topic("clicks")
+ * .property("zookeeper.connect", "localhost")
+ * .property("group.id", "click-group")
+ * .startFromEarliest())
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .field("proc-time", "TIMESTAMP").proctime())
+ * .toTable()
+ * }}}
+ *
+ * @param connectorDescriptor connector descriptor describing the source of the table
+ */
+ def from(connectorDescriptor: ConnectorDescriptor): BatchTableSourceDescriptor = {
+ new BatchTableSourceDescriptor(this, connectorDescriptor)
+ }
/**
* Registers an external [[TableSink]] with given field names and types in this
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 84d7240..510fe0d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableSourceDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.nodes.FlinkConventions
@@ -128,16 +129,44 @@ abstract class StreamTableEnvironment(
}
}
-// TODO expose this once we have enough table source factories that can deal with it
-// /**
-// * Creates a table from a descriptor that describes the source connector, source encoding,
-// * the resulting table schema, and other properties.
-// *
-// * @param connectorDescriptor connector descriptor describing the source of the table
-// */
-// def from(connectorDescriptor: ConnectorDescriptor): StreamTableSourceDescriptor = {
-// new StreamTableSourceDescriptor(this, connectorDescriptor)
-// }
+ /**
+ * Creates a table from a descriptor that describes the source connector, the source format,
+ * the resulting table schema, and other properties.
+ *
+ * Descriptors allow for declaring communication to external systems in an
+ * implementation-agnostic way. The classpath is scanned for connectors and matching connectors
+ * are configured accordingly.
+ *
+ * The following example shows how to read from a Kafka connector using a JSON format and
+ * creating a table:
+ *
+ * {{{
+ *
+ * tableEnv
+ * .from(
+ * new Kafka()
+ * .version("0.11")
+ * .topic("clicks")
+ * .property("zookeeper.connect", "localhost")
+ * .property("group.id", "click-group")
+ * .startFromEarliest())
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .field("proc-time", "TIMESTAMP").proctime())
+ * .toTable()
+ * }}}
+ *
+ * @param connectorDescriptor connector descriptor describing the source of the table
+ */
+ def from(connectorDescriptor: ConnectorDescriptor): StreamTableSourceDescriptor = {
+ new StreamTableSourceDescriptor(this, connectorDescriptor)
+ }
/**
* Registers an external [[TableSink]] with given field names and types in this
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d6106be..88dc1e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -49,6 +49,7 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
+import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableSourceDescriptor}
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
@@ -511,6 +512,43 @@ abstract class TableEnvironment(val config: TableConfig) {
}
}
+ /**
+ * Creates a table from a descriptor that describes the source connector, the source format,
+ * the resulting table schema, and other properties.
+ *
+ * Descriptors allow for declaring communication to external systems in an
+ * implementation-agnostic way. The classpath is scanned for connectors and matching connectors
+ * are configured accordingly.
+ *
+ * The following example shows how to read from a Kafka connector using a JSON format and
+ * creating table:
+ *
+ * {{{
+ *
+ * tableEnv
+ * .from(
+ * new Kafka()
+ * .version("0.11")
+ * .topic("clicks")
+ * .property("zookeeper.connect", "localhost")
+ * .property("group.id", "click-group")
+ * .startFromEarliest())
+ * .withFormat(
+ * new Json()
+ * .jsonSchema("{...}")
+ * .failOnMissingField(false))
+ * .withSchema(
+ * new Schema()
+ * .field("user-name", "VARCHAR").from("u_name")
+ * .field("count", "DECIMAL")
+ * .field("proc-time", "TIMESTAMP").proctime())
+ * .toTable()
+ * }}}
+ *
+ * @param connectorDescriptor connector descriptor describing the source of the table
+ */
+ def from(connectorDescriptor: ConnectorDescriptor): TableSourceDescriptor
+
private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
val schemaPaths = tablePath.slice(0, tablePath.length - 1)
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index 6389b55..bcee5ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -24,6 +24,7 @@ import _root_.scala.collection.mutable.ArrayBuffer
import _root_.java.util.Objects
import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.types.Row
/**
* A TableSchema represents a Table's structure.
@@ -135,6 +136,15 @@ class TableSchema(
new TableSchema(columnNames, converted)
}
+ /**
+ * Converts a table schema into a (nested) type information describing a [[Row]].
+ *
+ * @return type information where columns are fields of a row
+ */
+ def toRowType: TypeInformation[Row] = {
+ Types.ROW(getColumnNames, getTypes)
+ }
+
override def toString: String = {
val builder = new StringBuilder
builder.append("root\n")
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index eb87c9d..e266a47 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.api
import org.apache.flink.table.descriptors.DescriptorProperties
+import org.apache.flink.table.formats.TableFormatFactory
/**
* Exception for all errors occurring during expression parsing.
@@ -142,6 +143,85 @@ case class CatalogAlreadyExistException(
}
/**
+ * Exception for not finding a [[org.apache.flink.table.formats.TableFormatFactory]] for the
+ * given properties.
+ *
+ * @param message message that indicates the current matching step
+ * @param factoryClass required factory class
+ * @param formatFactories all found factories
+ * @param properties properties that describe the table format
+ * @param cause the cause
+ */
+case class NoMatchingTableFormatException(
+ message: String,
+ factoryClass: Class[_],
+ formatFactories: Seq[TableFormatFactory[_]],
+ properties: Map[String, String],
+ cause: Throwable)
+ extends RuntimeException(
+ s"""Could not find a suitable table format factory for '${factoryClass.getName}' in
+ |the classpath.
+ |
+ |Reason: $message
+ |
+ |The following properties are requested:
+ |${DescriptorProperties.toString(properties)}
+ |
+ |The following format factories have been considered:
+ |${formatFactories.map(_.getClass.getName).mkString("\n")}
+ |""".stripMargin,
+ cause) {
+
+ def this(
+ message: String,
+ factoryClass: Class[_],
+ formatFactories: Seq[TableFormatFactory[_]],
+ properties: Map[String, String]) = {
+ this(message, factoryClass, formatFactories, properties, null)
+ }
+}
+
+/**
+ * Exception for finding more than one [[org.apache.flink.table.formats.TableFormatFactory]] for
+ * the given properties.
+ *
+ * @param matchingFormatFactories format factories that match the properties
+ * @param factoryClass required factory class
+ * @param formatFactories all found factories
+ * @param properties properties that describe the table format
+ * @param cause the cause
+ */
+case class AmbiguousTableFormatException(
+ matchingFormatFactories: Seq[TableFormatFactory[_]],
+ factoryClass: Class[_],
+ formatFactories: Seq[TableFormatFactory[_]],
+ properties: Map[String, String],
+ cause: Throwable)
+ extends RuntimeException(
+ s"""More than one suitable table format factory for '${factoryClass.getName}' could
+ |be found in the classpath.
+ |
+ |The following format factories match:
+ |${matchingFormatFactories.map(_.getClass.getName).mkString("\n")}
+ |
+ |The following properties are requested:
+ |${DescriptorProperties.toString(properties)}
+ |
+ |The following format factories have been considered:
+ |${formatFactories.map(_.getClass.getName).mkString("\n")}
+ |""".stripMargin,
+ cause) {
+
+ def this(
+ matchingFormatFactories: Seq[TableFormatFactory[_]],
+ factoryClass: Class[_],
+ formatFactories: Seq[TableFormatFactory[_]],
+ properties: Map[String, String]) = {
+ this(matchingFormatFactories, factoryClass, formatFactories, properties, null)
+ }
+}
+
+/**
* Exception for not finding a [[org.apache.flink.table.sources.TableSourceFactory]] for the
* given properties.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
index 4f5bd81..812b78a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
@@ -30,6 +30,7 @@ import org.apache.commons.codec.binary.Base64
import org.apache.commons.lang.StringEscapeUtils
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
import org.apache.flink.table.descriptors.DescriptorProperties.{NAME, TYPE, normalizeTableSchema, toJava}
import org.apache.flink.table.typeutils.TypeStringUtils
@@ -950,7 +951,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
validateString(prefix + NAME, isOptional = false, minLen = 1)
}
val typeValidation = (prefix: String) => {
- validateType(prefix + TYPE, isOptional = false)
+ validateType(prefix + TYPE, requireRow = false, isOptional = false)
}
validateFixedIndexedProperties(
@@ -998,13 +999,19 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
/**
* Validates a type property.
*/
- def validateType(key: String, isOptional: Boolean): Unit = {
+ def validateType(key: String, requireRow: Boolean, isOptional: Boolean): Unit = {
if (!properties.contains(key)) {
if (!isOptional) {
throw new ValidationException(s"Could not find required property '$key'.")
}
} else {
- TypeStringUtils.readTypeInfo(properties(key)) // throws validation exceptions
+ // we don't validate the string but let the parser do the work for us
+ // it throws a validation exception
+ val info = TypeStringUtils.readTypeInfo(properties(key))
+ if (requireRow && !info.isInstanceOf[RowTypeInfo]) {
+ throw new ValidationException(
+ s"Row type information expected for '$key' but was: ${properties(key)}")
+ }
}
}
@@ -1079,7 +1086,7 @@ class DescriptorProperties(normalizeKeys: Boolean = true) {
*/
private def put(key: String, value: String): Unit = {
if (properties.contains(key)) {
- throw new IllegalStateException("Property already present.")
+ throw new IllegalStateException("Property already present:" + key)
}
if (normalizeKeys) {
properties.put(key.toLowerCase, value)
@@ -1263,7 +1270,7 @@ object DescriptorProperties {
}
def toString(keyOrValue: String): String = {
- StringEscapeUtils.escapeJava(keyOrValue)
+ StringEscapeUtils.escapeJava(keyOrValue).replace("\\/", "/") // '/' must not be escaped
}
def toString(key: String, value: String): String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
index 301189a..3d44c24 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/FormatDescriptorValidator.scala
@@ -34,6 +34,11 @@ class FormatDescriptorValidator extends DescriptorValidator {
object FormatDescriptorValidator {
/**
+ * Prefix for format-related properties.
+ */
+ val FORMAT = "format"
+
+ /**
* Key for describing the type of the format. Usually used for factory discovery.
*/
val FORMAT_TYPE = "format.type"
http://git-wip-us.apache.org/repos/asf/flink/blob/ee40335f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
index fdec820..160347e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
@@ -28,7 +28,11 @@ import scala.collection.JavaConverters._
/**
* Validator for [[Rowtime]].
*/
-class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
+class RowtimeValidator(
+ supportsSourceTimestamps: Boolean,
+ supportsSourceWatermarks: Boolean,
+ prefix: String = "")
+ extends DescriptorValidator {
override def validate(properties: DescriptorProperties): Unit = {
val timestampExistingField = (_: String) => {
@@ -43,14 +47,21 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
prefix + ROWTIME_TIMESTAMPS_SERIALIZED, isOptional = false, minLen = 1)
}
- properties.validateEnum(
- prefix + ROWTIME_TIMESTAMPS_TYPE,
- isOptional = false,
+ val timestampsValidation = if (supportsSourceTimestamps) {
Map(
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> properties.noValidation(),
- ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom)
- ).asJava
+ ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
+ } else {
+ Map(
+ ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
+ ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
+ }
+
+ properties.validateEnum(
+ prefix + ROWTIME_TIMESTAMPS_TYPE,
+ isOptional = false,
+ timestampsValidation.asJava
)
val watermarkPeriodicBounded = (_: String) => {
@@ -65,15 +76,23 @@ class RowtimeValidator(val prefix: String = "") extends DescriptorValidator {
prefix + ROWTIME_WATERMARKS_SERIALIZED, isOptional = false, minLen = 1)
}
- properties.validateEnum(
- prefix + ROWTIME_WATERMARKS_TYPE,
- isOptional = false,
+ val watermarksValidation = if (supportsSourceWatermarks) {
Map(
ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> properties.noValidation(),
ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE -> properties.noValidation(),
- ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom)
- ).asJava
+ ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
+ } else {
+ Map(
+ ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> properties.noValidation(),
+ ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
+ ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
+ }
+
+ properties.validateEnum(
+ prefix + ROWTIME_WATERMARKS_TYPE,
+ isOptional = false,
+ watermarksValidation.asJava
)
}
}
@@ -154,7 +173,7 @@ object RowtimeValidator {
new ExistingField(field)
case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE =>
- new StreamRecordTimestamp
+ StreamRecordTimestamp.INSTANCE
case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM =>
val clazz = properties.getClass(