You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/19 13:35:49 UTC
[flink] 01/02: [FLINK-13286][table-api] Port RowtimeValidator and
SchemaValidator to api-java-bridge
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 86ece85369ad3e53e23dda57a0e95483942a30b0
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jul 18 20:57:12 2019 +0800
[FLINK-13286][table-api] Port RowtimeValidator and SchemaValidator to api-java-bridge
This closes #9168
---
.../flink/table/descriptors/RowtimeValidator.java | 176 +++++++++++++
.../flink/table/descriptors/SchemaValidator.java | 287 +++++++++++++++++++++
.../flink/table/descriptors/RowtimeValidator.scala | 156 -----------
.../flink/table/descriptors/SchemaValidator.scala | 275 --------------------
.../table/sinks/CsvTableSinkFactoryBase.scala | 5 +-
.../table/sources/CsvTableSourceFactoryBase.scala | 5 +-
.../flink/table/descriptors/RowtimeTest.scala | 2 +-
.../flink/table/descriptors/SchemaTest.scala | 5 +-
.../flink/table/utils/InMemoryTableFactory.scala | 10 +-
9 files changed, 469 insertions(+), 452 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java
new file mode 100644
index 0000000..9061a02
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
+import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks;
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED;
+
+/**
+ * Validator for {@link Rowtime}.
+ */
+@PublicEvolving
+public class RowtimeValidator implements DescriptorValidator {
+
+ private final boolean supportsSourceTimestamps;
+ private final boolean supportsSourceWatermarks;
+ private final String prefix;
+
+ public RowtimeValidator(boolean supportsSourceTimestamps, boolean supportsSourceWatermarks) {
+ this(supportsSourceTimestamps, supportsSourceWatermarks, "");
+ }
+
+ public RowtimeValidator(boolean supportsSourceTimestamps, boolean supportsSourceWatermarks, String prefix) {
+ this.supportsSourceTimestamps = supportsSourceTimestamps;
+ this.supportsSourceWatermarks = supportsSourceWatermarks;
+ this.prefix = prefix;
+ }
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ Consumer<String> timestampExistingField =
+ s -> properties.validateString(prefix + ROWTIME_TIMESTAMPS_FROM, false, 1);
+
+ Consumer<String> timestampCustom = s -> {
+ properties.validateString(prefix + ROWTIME_TIMESTAMPS_CLASS, false, 1);
+ properties.validateString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED, false, 1);
+ };
+
+ Map<String, Consumer<String>> timestampsValidation = new HashMap<>();
+ if (supportsSourceTimestamps) {
+ timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, timestampExistingField);
+ timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE, DescriptorProperties.noValidation());
+ timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, timestampCustom);
+ } else {
+ timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, timestampExistingField);
+ timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, timestampCustom);
+ }
+
+ properties.validateEnum(prefix + ROWTIME_TIMESTAMPS_TYPE, false, timestampsValidation);
+
+ Consumer<String> watermarkPeriodicBounded =
+ s -> properties.validateLong(prefix + ROWTIME_WATERMARKS_DELAY, false, 0);
+
+ Consumer<String> watermarkCustom = s -> {
+ properties.validateString(prefix + ROWTIME_WATERMARKS_CLASS, false, 1);
+ properties.validateString(prefix + ROWTIME_WATERMARKS_SERIALIZED, false, 1);
+ };
+
+ Map<String, Consumer<String>> watermarksValidation = new HashMap<>();
+ if (supportsSourceWatermarks) {
+ watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING, DescriptorProperties.noValidation());
+ watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, watermarkPeriodicBounded);
+ watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE, DescriptorProperties.noValidation());
+ watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, watermarkCustom);
+ } else {
+ watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING, DescriptorProperties.noValidation());
+ watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, watermarkPeriodicBounded);
+ watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, watermarkCustom);
+ }
+
+ properties.validateEnum(prefix + ROWTIME_WATERMARKS_TYPE, false, watermarksValidation);
+ }
+
+ // utilities
+
+ public static Optional<Tuple2<TimestampExtractor, WatermarkStrategy>> getRowtimeComponents(
+ DescriptorProperties properties, String prefix) {
+ // create timestamp extractor
+ TimestampExtractor extractor;
+ Optional<String> t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE);
+ if (!t.isPresent()) {
+ return Optional.empty();
+ }
+
+ switch (t.get()) {
+ case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD:
+ String field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM);
+ extractor = new ExistingField(field);
+ break;
+ case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE:
+ extractor = StreamRecordTimestamp.INSTANCE;
+ break;
+ case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM:
+ Class<TimestampExtractor> clazz = properties.getClass(
+ prefix + ROWTIME_TIMESTAMPS_CLASS, TimestampExtractor.class);
+ extractor = EncodingUtils.decodeStringToObject(
+ properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
+ clazz);
+ break;
+ default:
+ throw new ValidationException("Unsupported rowtime timestamps type: " + t.get());
+ }
+
+ // create watermark strategy
+ WatermarkStrategy strategy;
+ String s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE);
+ switch (s) {
+ case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING:
+ strategy = new AscendingTimestamps();
+ break;
+ case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED:
+ long delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY);
+ strategy = new BoundedOutOfOrderTimestamps(delay);
+ break;
+ case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE:
+ strategy = PreserveWatermarks.INSTANCE;
+ break;
+ case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM:
+ Class<WatermarkStrategy> clazz = properties.getClass(
+ prefix + ROWTIME_WATERMARKS_CLASS, WatermarkStrategy.class);
+ strategy = EncodingUtils.decodeStringToObject(
+ properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED),
+ clazz);
+ break;
+ default:
+ throw new RuntimeException("Unsupported rowtime timestamps type: " + s);
+ }
+
+ return Optional.of(new Tuple2<>(extractor, strategy));
+ }
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
new file mode 100644
index 0000000..2ad92c1
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.TableFormatFactory;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Validator for {@link Schema}.
+ */
+@PublicEvolving
+public class SchemaValidator implements DescriptorValidator {
+
+ private final boolean isStreamEnvironment;
+ private final boolean supportsSourceTimestamps;
+ private final boolean supportsSourceWatermarks;
+
+ public SchemaValidator(boolean isStreamEnvironment, boolean supportsSourceTimestamps,
+ boolean supportsSourceWatermarks) {
+ this.isStreamEnvironment = isStreamEnvironment;
+ this.supportsSourceTimestamps = supportsSourceTimestamps;
+ this.supportsSourceWatermarks = supportsSourceWatermarks;
+ }
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+ Map<String, String> types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE);
+
+ if (names.isEmpty() && types.isEmpty()) {
+ throw new ValidationException(
+ format("Could not find the required schema in property '%s'.", SCHEMA));
+ }
+
+ boolean proctimeFound = false;
+
+ for (int i = 0; i < Math.max(names.size(), types.size()); i++) {
+ properties.validateString(SCHEMA + "." + i + "." + SCHEMA_NAME, false, 1);
+ properties.validateType(SCHEMA + "." + i + "." + SCHEMA_TYPE, false, false);
+ properties.validateString(SCHEMA + "." + i + "." + SCHEMA_FROM, true, 1);
+ // either proctime or rowtime
+ String proctime = SCHEMA + "." + i + "." + SCHEMA_PROCTIME;
+ String rowtime = SCHEMA + "." + i + "." + ROWTIME;
+ if (properties.containsKey(proctime)) {
+ // check the environment
+ if (!isStreamEnvironment) {
+ throw new ValidationException(
+ format("Property '%s' is not allowed in a batch environment.", proctime));
+ }
+ // check for only one proctime attribute
+ else if (proctimeFound) {
+ throw new ValidationException("A proctime attribute must only be defined once.");
+ }
+ // check proctime
+ properties.validateBoolean(proctime, false);
+ proctimeFound = properties.getBoolean(proctime);
+ // no rowtime
+ properties.validatePrefixExclusion(rowtime);
+ } else if (properties.hasPrefix(rowtime)) {
+ // check rowtime
+ RowtimeValidator rowtimeValidator = new RowtimeValidator(
+ supportsSourceTimestamps,
+ supportsSourceWatermarks,
+ SCHEMA + "." + i + ".");
+ rowtimeValidator.validate(properties);
+ // no proctime
+ properties.validateExclusion(proctime);
+ }
+ }
+ }
+
+ /**
+ * Returns keys for a {@link TableFormatFactory#supportedProperties()} method that
+ * are accepted for schema derivation using {@code deriveFormatFields(DescriptorProperties)}.
+ */
+ public static List<String> getSchemaDerivationKeys() {
+ List<String> keys = new ArrayList<>();
+
+ // schema
+ keys.add(SCHEMA + ".#." + SCHEMA_TYPE);
+ keys.add(SCHEMA + ".#." + SCHEMA_NAME);
+ keys.add(SCHEMA + ".#." + SCHEMA_FROM);
+
+ // time attributes
+ keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
+ keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
+ keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
+ keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
+ keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
+ keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
+ keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
+ keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
+ keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
+
+ return keys;
+ }
+
+ /**
+ * Finds the proctime attribute if defined.
+ */
+ public static Optional<String> deriveProctimeAttribute(DescriptorProperties properties) {
+ Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+ for (int i = 0; i < names.size(); i++) {
+ Optional<Boolean> isProctime = properties.getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME);
+ if (isProctime.isPresent() && isProctime.get()) {
+ return Optional.of(names.get(SCHEMA + "." + i + "." + SCHEMA_NAME));
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Finds the rowtime attributes if defined.
+ */
+ public static List<RowtimeAttributeDescriptor> deriveRowtimeAttributes(
+ DescriptorProperties properties) {
+
+ Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+ List<RowtimeAttributeDescriptor> attributes = new ArrayList<>();
+
+ // check for rowtime in every field
+ for (int i = 0; i < names.size(); i++) {
+ Optional<Tuple2<TimestampExtractor, WatermarkStrategy>> rowtimeComponents = RowtimeValidator
+ .getRowtimeComponents(properties, SCHEMA + "." + i + ".");
+ int index = i;
+ // create descriptor
+ rowtimeComponents.ifPresent(tuple2 -> attributes.add(new RowtimeAttributeDescriptor(
+ properties.getString(SCHEMA + "." + index + "." + SCHEMA_NAME),
+ tuple2.f0,
+ tuple2.f1))
+ );
+ }
+
+ return attributes;
+ }
+
+ /**
+ * Derives the table schema for a table sink. A sink ignores a proctime attribute and
+ * needs to track the origin of a rowtime field.
+ *
+ * @deprecated This method combines two separate concepts of table schema and field mapping.
+ * This should be split into two methods once we have support for
+ * the corresponding interfaces (see FLINK-9870).
+ */
+ @Deprecated
+ public static TableSchema deriveTableSinkSchema(DescriptorProperties properties) {
+ TableSchema.Builder builder = TableSchema.builder();
+
+ TableSchema schema = properties.getTableSchema(SCHEMA);
+
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ TypeInformation t = schema.getFieldTypes()[i];
+ String n = schema.getFieldNames()[i];
+ boolean isProctime = properties
+ .getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME)
+ .orElse(false);
+ String tsType = SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE;
+ boolean isRowtime = properties.containsKey(tsType);
+ if (!isProctime && !isRowtime) {
+ // check for a aliasing
+ String fieldName = properties.getOptionalString(SCHEMA + "." + i + "." + SCHEMA_FROM)
+ .orElse(n);
+ builder.field(fieldName, t);
+ }
+ // only use the rowtime attribute if it references a field
+ else if (isRowtime) {
+ switch (properties.getString(tsType)) {
+ case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD:
+ String field = properties.getString(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_FROM);
+ builder.field(field, t);
+ break;
+ // other timestamp strategies require a reverse timestamp extractor to
+ // insert the timestamp into the output
+ default:
+ throw new TableException(format("Unsupported rowtime type '%s' for sink" +
+ " table schema. Currently only '%s' is supported for table sinks.",
+ t, ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD));
+ }
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Finds a table source field mapping.
+ *
+ * @param properties The properties describing a schema.
+ * @param inputType The input type that a connector and/or format produces. This parameter
+ * can be used to resolve a rowtime field against an input field.
+ */
+ public static Map<String, String> deriveFieldMapping(
+ DescriptorProperties properties, Optional<TypeInformation<?>> inputType) {
+
+ Map<String, String> mapping = new HashMap<>();
+
+ TableSchema schema = properties.getTableSchema(SCHEMA);
+
+ List<String> columnNames = new ArrayList<>();
+ inputType.ifPresent(t -> columnNames.addAll(Arrays.asList(((CompositeType) t).getFieldNames())));
+
+ // add all source fields first because rowtime might reference one of them
+ columnNames.forEach(name -> mapping.put(name, name));
+
+ // add all schema fields first for implicit mappings
+ Arrays.stream(schema.getFieldNames()).forEach(name -> mapping.put(name, name));
+
+ Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+ for (int i = 0; i < names.size(); i++) {
+ String name = properties.getString(SCHEMA + "." + i + "." + SCHEMA_NAME);
+ Optional<String> source = properties.getOptionalString(SCHEMA + "." + i + "." + SCHEMA_FROM);
+ if (source.isPresent()) {
+ // add explicit mapping
+ mapping.put(name, source.get());
+ } else { // implicit mapping or time
+ boolean isProctime = properties
+ .getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME)
+ .orElse(false);
+ boolean isRowtime = properties
+ .containsKey(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE);
+ // remove proctime/rowtime from mapping
+ if (isProctime || isRowtime) {
+ mapping.remove(name);
+ }
+ // check for invalid fields
+ else if (!columnNames.contains(name)) {
+ throw new ValidationException(format("Could not map the schema field '%s' to a field " +
+ "from source. Please specify the source field from which it can be derived.", name));
+ }
+ }
+ }
+
+ return mapping;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
deleted file mode 100644
index a3b6e8d..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.Rowtime._
-import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
-import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
-import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
-import org.apache.flink.table.utils.EncodingUtils
-
-import scala.collection.JavaConverters._
-
-/**
- * Validator for [[Rowtime]].
- */
-class RowtimeValidator(
- supportsSourceTimestamps: Boolean,
- supportsSourceWatermarks: Boolean,
- prefix: String = "")
- extends DescriptorValidator {
-
- override def validate(properties: DescriptorProperties): Unit = {
- val timestampExistingField = (_: String) => {
- properties.validateString(
- prefix + ROWTIME_TIMESTAMPS_FROM, false, 1)
- }
-
- val timestampCustom = (_: String) => {
- properties.validateString(
- prefix + ROWTIME_TIMESTAMPS_CLASS, false, 1)
- properties.validateString(
- prefix + ROWTIME_TIMESTAMPS_SERIALIZED, false, 1)
- }
-
- val timestampsValidation = if (supportsSourceTimestamps) {
- Map(
- ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
- ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> DescriptorProperties.noValidation(),
- ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
- } else {
- Map(
- ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
- ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
- }
-
- properties.validateEnum(
- prefix + ROWTIME_TIMESTAMPS_TYPE,
- false,
- timestampsValidation.asJava
- )
-
- val watermarkPeriodicBounded = (_: String) => {
- properties.validateLong(
- prefix + ROWTIME_WATERMARKS_DELAY, false, 0)
- }
-
- val watermarkCustom = (_: String) => {
- properties.validateString(
- prefix + ROWTIME_WATERMARKS_CLASS, false, 1)
- properties.validateString(
- prefix + ROWTIME_WATERMARKS_SERIALIZED, false, 1)
- }
-
- val watermarksValidation = if (supportsSourceWatermarks) {
- Map(
- ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> DescriptorProperties.noValidation(),
- ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
- ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE -> DescriptorProperties.noValidation(),
- ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
- } else {
- Map(
- ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> DescriptorProperties.noValidation(),
- ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
- ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
- }
-
- properties.validateEnum(
- prefix + ROWTIME_WATERMARKS_TYPE,
- false,
- watermarksValidation.asJava
- )
- }
-}
-
-object RowtimeValidator {
-
- // utilities
-
- def getRowtimeComponents(properties: DescriptorProperties, prefix: String)
- : Option[(TimestampExtractor, WatermarkStrategy)] = {
-
- // create timestamp extractor
- val t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE)
- if (!t.isPresent) {
- return None
- }
- val extractor: TimestampExtractor = t.get() match {
-
- case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =>
- val field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM)
- new ExistingField(field)
-
- case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE =>
- StreamRecordTimestamp.INSTANCE
-
- case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM =>
- val clazz = properties.getClass(
- prefix + ROWTIME_TIMESTAMPS_CLASS,
- classOf[TimestampExtractor])
- EncodingUtils.decodeStringToObject(
- properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
- clazz)
- }
-
- // create watermark strategy
- val s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE)
- val strategy: WatermarkStrategy = s match {
-
- case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING =>
- new AscendingTimestamps()
-
- case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED =>
- val delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY)
- new BoundedOutOfOrderTimestamps(delay)
-
- case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE =>
- PreserveWatermarks.INSTANCE
-
- case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM =>
- val clazz = properties.getClass(
- prefix + ROWTIME_WATERMARKS_CLASS,
- classOf[WatermarkStrategy])
- EncodingUtils.decodeStringToObject(
- properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED),
- clazz)
- }
-
- Some((extractor, strategy))
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
deleted file mode 100644
index f5769bf..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-import java.util.Optional
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
-import org.apache.flink.table.descriptors.Rowtime._
-import org.apache.flink.table.descriptors.Schema._
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor
-import org.apache.flink.table.util.JavaScalaConversionUtil.{toJava, toScala}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * Validator for [[Schema]].
- */
-class SchemaValidator(
- isStreamEnvironment: Boolean,
- supportsSourceTimestamps: Boolean,
- supportsSourceWatermarks: Boolean)
- extends DescriptorValidator {
-
- override def validate(properties: DescriptorProperties): Unit = {
- val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
- val types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE)
-
- if (names.isEmpty && types.isEmpty) {
- throw new ValidationException(
- s"Could not find the required schema in property '$SCHEMA'.")
- }
-
- var proctimeFound = false
-
- for (i <- 0 until Math.max(names.size, types.size)) {
- properties
- .validateString(s"$SCHEMA.$i.$SCHEMA_NAME", false, 1)
- properties
- .validateType(s"$SCHEMA.$i.$SCHEMA_TYPE", false, false)
- properties
- .validateString(s"$SCHEMA.$i.$SCHEMA_FROM", true, 1)
- // either proctime or rowtime
- val proctime = s"$SCHEMA.$i.$SCHEMA_PROCTIME"
- val rowtime = s"$SCHEMA.$i.$ROWTIME"
- if (properties.containsKey(proctime)) {
- // check the environment
- if (!isStreamEnvironment) {
- throw new ValidationException(
- s"Property '$proctime' is not allowed in a batch environment.")
- }
- // check for only one proctime attribute
- else if (proctimeFound) {
- throw new ValidationException("A proctime attribute must only be defined once.")
- }
- // check proctime
- properties.validateBoolean(proctime, false)
- proctimeFound = properties.getBoolean(proctime)
- // no rowtime
- properties.validatePrefixExclusion(rowtime)
- } else if (properties.hasPrefix(rowtime)) {
- // check rowtime
- val rowtimeValidator = new RowtimeValidator(
- supportsSourceTimestamps,
- supportsSourceWatermarks,
- s"$SCHEMA.$i.")
- rowtimeValidator.validate(properties)
- // no proctime
- properties.validateExclusion(proctime)
- }
- }
- }
-}
-
-object SchemaValidator {
-
- /**
- * Returns keys for a
- * [[org.apache.flink.table.factories.TableFormatFactory.supportedProperties()]] method that
- * are accepted for schema derivation using [[deriveFormatFields(DescriptorProperties)]].
- */
- def getSchemaDerivationKeys: util.List[String] = {
- val keys = new util.ArrayList[String]()
-
- // schema
- keys.add(SCHEMA + ".#." + SCHEMA_TYPE)
- keys.add(SCHEMA + ".#." + SCHEMA_NAME)
- keys.add(SCHEMA + ".#." + SCHEMA_FROM)
-
- // time attributes
- keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME)
- keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE)
- keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM)
- keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS)
- keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED)
- keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE)
- keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS)
- keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED)
- keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY)
-
- keys
- }
-
- // utilities
-
- /**
- * Finds the proctime attribute if defined.
- */
- def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = {
- val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
- for (i <- 0 until names.size) {
- val isProctime = toScala(
- properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
- isProctime.foreach { isSet =>
- if (isSet) {
- return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
- }
- }
- }
- toJava(None)
- }
-
- /**
- * Finds the rowtime attributes if defined.
- */
- def deriveRowtimeAttributes(properties: DescriptorProperties)
- : util.List[RowtimeAttributeDescriptor] = {
-
- val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
- var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
-
- // check for rowtime in every field
- for (i <- 0 until names.size) {
- RowtimeValidator
- .getRowtimeComponents(properties, s"$SCHEMA.$i.")
- .foreach { case (extractor, strategy) =>
- // create descriptor
- attributes += new RowtimeAttributeDescriptor(
- properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
- extractor,
- strategy)
- }
- }
-
- attributes.asJava
- }
-
- /**
- * Derives the table schema for a table sink. A sink ignores a proctime attribute and
- * needs to track the origin of a rowtime field.
- *
- * @deprecated This method combines two separate concepts of table schema and field mapping.
- * This should be split into two methods once we have support for
- * the corresponding interfaces (see FLINK-9870).
- */
- @deprecated
- def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = {
- val builder = TableSchema.builder()
-
- val schema = properties.getTableSchema(SCHEMA)
-
- schema.getFieldNames.zip(schema.getFieldTypes).zipWithIndex.foreach { case ((n, t), i) =>
- val isProctime = properties
- .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
- .orElse(false)
- val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
- val isRowtime = properties.containsKey(tsType)
- if (!isProctime && !isRowtime) {
- // check for a aliasing
- val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
- .orElse(n)
- builder.field(fieldName, t)
- }
- // only use the rowtime attribute if it references a field
- else if (isRowtime) {
- properties.getString(tsType) match {
- case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =>
- val field = properties.getString(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_FROM")
- builder.field(field, t)
-
- // other timestamp strategies require a reverse timestamp extractor to
- // insert the timestamp into the output
- case t@_ =>
- throw new TableException(
- s"Unsupported rowtime type '$t' for sink table schema. Currently " +
- s"only '$ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD' is supported for table sinks.")
- }
- }
- }
-
- builder.build()
- }
-
- /**
- * Finds a table source field mapping.
- *
- * @param properties The properties describing a schema.
- * @param inputType The input type that a connector and/or format produces. This parameter
- * can be used to resolve a rowtime field against an input field.
- */
- def deriveFieldMapping(
- properties: DescriptorProperties,
- inputType: Optional[TypeInformation[_]])
- : util.Map[String, String] = {
-
- val mapping = mutable.Map[String, String]()
-
- val schema = properties.getTableSchema(SCHEMA)
-
- val columnNames = toScala(inputType) match {
- case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq
- case _ => Seq[String]()
- }
-
- // add all source fields first because rowtime might reference one of them
- columnNames.foreach(name => mapping.put(name, name))
-
- // add all schema fields first for implicit mappings
- schema.getFieldNames.foreach { name =>
- mapping.put(name, name)
- }
-
- val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
- for (i <- 0 until names.size) {
- val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
- toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match {
-
- // add explicit mapping
- case Some(source) =>
- mapping.put(name, source)
-
- // implicit mapping or time
- case None =>
- val isProctime = properties
- .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
- .orElse(false)
- val isRowtime = properties
- .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
- // remove proctime/rowtime from mapping
- if (isProctime || isRowtime) {
- mapping.remove(name)
- }
- // check for invalid fields
- else if (!columnNames.contains(name)) {
- throw new ValidationException(s"Could not map the schema field '$name' to a field " +
- s"from source. Please specify the source field from which it can be derived.")
- }
- }
- }
-
- mapping.toMap.asJava
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
index 1365eae..3aa114e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
@@ -70,10 +70,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory {
// validate
new FileSystemValidator().validate(params)
new OldCsvValidator().validate(params)
- new SchemaValidator(
- isStreaming,
- supportsSourceTimestamps = false,
- supportsSourceWatermarks = false).validate(params)
+ new SchemaValidator(isStreaming, false, false).validate(params)
// build
val formatSchema = params.getTableSchema(FORMAT_FIELDS)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
index 41c9698..a7a48bb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
@@ -75,10 +75,7 @@ abstract class CsvTableSourceFactoryBase extends TableFactory {
// validate
new FileSystemValidator().validate(params)
new OldCsvValidator().validate(params)
- new SchemaValidator(
- isStreaming,
- supportsSourceTimestamps = false,
- supportsSourceWatermarks = false).validate(params)
+ new SchemaValidator(isStreaming, false, false).validate(params)
// build
val csvTableSourceBuilder = new CsvTableSource.Builder
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
index 0f51c76..8bd327c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -71,7 +71,7 @@ class RowtimeTest extends DescriptorTestBase {
}
override def validator(): DescriptorValidator = {
- new RowtimeValidator(supportsSourceTimestamps = true, supportsSourceWatermarks = false)
+ new RowtimeValidator(true, false)
}
override def properties(): util.List[util.Map[String, String]] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
index 8bf9709..3ab1a67 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
@@ -61,10 +61,7 @@ class SchemaTest extends DescriptorTestBase {
}
override def validator(): DescriptorValidator = {
- new SchemaValidator(
- isStreamEnvironment = true,
- supportsSourceTimestamps = true,
- supportsSourceWatermarks = true)
+ new SchemaValidator(true, true, true)
}
override def properties(): util.List[util.Map[String, String]] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 28f68d5..6de7525 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -51,10 +51,7 @@ class InMemoryTableFactory(terminationCount: Int)
params.putProperties(properties)
// validate
- new SchemaValidator(
- isStreamEnvironment = true,
- supportsSourceTimestamps = true,
- supportsSourceWatermarks = true).validate(params)
+ new SchemaValidator(true, true, true).validate(params)
val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
@@ -71,10 +68,7 @@ class InMemoryTableFactory(terminationCount: Int)
params.putProperties(properties)
// validate
- new SchemaValidator(
- isStreamEnvironment = true,
- supportsSourceTimestamps = true,
- supportsSourceWatermarks = true).validate(params)
+ new SchemaValidator(true, true, true).validate(params)
val tableSchema = params.getTableSchema(SCHEMA)