You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/19 13:35:48 UTC

[flink] branch master updated (311dd15 -> 0b53eef)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 311dd15  [FLINK-13255][hive] Skip tests on Java 9
     new 86ece85  [FLINK-13286][table-api] Port RowtimeValidator and SchemaValidator to api-java-bridge
     new 0b53eef  [FLINK-13286][table-common] Port FileSystem and FileSystemValidator to common

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/descriptors/RowtimeValidator.java  | 176 +++++++++++++
 .../flink/table/descriptors/SchemaValidator.java   | 287 +++++++++++++++++++++
 .../{Descriptor.java => FileSystem.java}           |  39 ++-
 .../table/descriptors/FileSystemValidator.java}    |  14 +-
 .../flink/table/descriptors/FileSystem.scala       |  64 -----
 .../table/descriptors/FileSystemValidator.scala    |  41 ---
 .../flink/table/descriptors/RowtimeValidator.scala | 156 -----------
 .../flink/table/descriptors/SchemaValidator.scala  | 275 --------------------
 .../table/sinks/CsvTableSinkFactoryBase.scala      |   5 +-
 .../table/sources/CsvTableSourceFactoryBase.scala  |   5 +-
 .../flink/table/descriptors/FileSystemTest.scala   |   2 +-
 .../flink/table/descriptors/RowtimeTest.scala      |   2 +-
 .../flink/table/descriptors/SchemaTest.scala       |   5 +-
 .../table/descriptors/TableDescriptorTest.scala    |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |   6 +-
 .../flink/table/utils/InMemoryTableFactory.scala   |  10 +-
 16 files changed, 511 insertions(+), 578 deletions(-)
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
 copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/{Descriptor.java => FileSystem.java} (54%)
 copy flink-table/{flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/GenericInMemoryCatalogValidator.java => flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java} (68%)
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala


[flink] 01/02: [FLINK-13286][table-api] Port RowtimeValidator and SchemaValidator to api-java-bridge

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 86ece85369ad3e53e23dda57a0e95483942a30b0
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jul 18 20:57:12 2019 +0800

    [FLINK-13286][table-api] Port RowtimeValidator and SchemaValidator to api-java-bridge
    
    This closes #9168
---
 .../flink/table/descriptors/RowtimeValidator.java  | 176 +++++++++++++
 .../flink/table/descriptors/SchemaValidator.java   | 287 +++++++++++++++++++++
 .../flink/table/descriptors/RowtimeValidator.scala | 156 -----------
 .../flink/table/descriptors/SchemaValidator.scala  | 275 --------------------
 .../table/sinks/CsvTableSinkFactoryBase.scala      |   5 +-
 .../table/sources/CsvTableSourceFactoryBase.scala  |   5 +-
 .../flink/table/descriptors/RowtimeTest.scala      |   2 +-
 .../flink/table/descriptors/SchemaTest.scala       |   5 +-
 .../flink/table/utils/InMemoryTableFactory.scala   |  10 +-
 9 files changed, 469 insertions(+), 452 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java
new file mode 100644
index 0000000..9061a02
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
+import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks;
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED;
+
+/**
+ * Validator for {@link Rowtime}.
+ */
+@PublicEvolving
+public class RowtimeValidator implements DescriptorValidator {
+
+	private final boolean supportsSourceTimestamps;
+	private final boolean supportsSourceWatermarks;
+	private final String prefix;
+
+	public RowtimeValidator(boolean supportsSourceTimestamps, boolean supportsSourceWatermarks) {
+		this(supportsSourceTimestamps, supportsSourceWatermarks, "");
+	}
+
+	public RowtimeValidator(boolean supportsSourceTimestamps, boolean supportsSourceWatermarks, String prefix) {
+		this.supportsSourceTimestamps = supportsSourceTimestamps;
+		this.supportsSourceWatermarks = supportsSourceWatermarks;
+		this.prefix = prefix;
+	}
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		Consumer<String> timestampExistingField =
+				s -> properties.validateString(prefix + ROWTIME_TIMESTAMPS_FROM, false, 1);
+
+		Consumer<String> timestampCustom = s -> {
+			properties.validateString(prefix + ROWTIME_TIMESTAMPS_CLASS, false, 1);
+			properties.validateString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED, false, 1);
+		};
+
+		Map<String, Consumer<String>> timestampsValidation = new HashMap<>();
+		if (supportsSourceTimestamps) {
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, timestampExistingField);
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE, DescriptorProperties.noValidation());
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, timestampCustom);
+		} else {
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, timestampExistingField);
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, timestampCustom);
+		}
+
+		properties.validateEnum(prefix + ROWTIME_TIMESTAMPS_TYPE, false, timestampsValidation);
+
+		Consumer<String> watermarkPeriodicBounded =
+				s -> properties.validateLong(prefix + ROWTIME_WATERMARKS_DELAY, false, 0);
+
+		Consumer<String> watermarkCustom = s -> {
+			properties.validateString(prefix + ROWTIME_WATERMARKS_CLASS, false, 1);
+			properties.validateString(prefix + ROWTIME_WATERMARKS_SERIALIZED, false, 1);
+		};
+
+		Map<String, Consumer<String>> watermarksValidation = new HashMap<>();
+		if (supportsSourceWatermarks) {
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING, DescriptorProperties.noValidation());
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, watermarkPeriodicBounded);
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE, DescriptorProperties.noValidation());
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, watermarkCustom);
+		} else {
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING, DescriptorProperties.noValidation());
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, watermarkPeriodicBounded);
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, watermarkCustom);
+		}
+
+		properties.validateEnum(prefix + ROWTIME_WATERMARKS_TYPE, false, watermarksValidation);
+	}
+
+	// utilities
+
+	public static Optional<Tuple2<TimestampExtractor, WatermarkStrategy>> getRowtimeComponents(
+			DescriptorProperties properties, String prefix) {
+		// create timestamp extractor
+		TimestampExtractor extractor;
+		Optional<String> t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE);
+		if (!t.isPresent()) {
+			return Optional.empty();
+		}
+
+		switch (t.get()) {
+			case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD:
+				String field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM);
+				extractor = new ExistingField(field);
+				break;
+			case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE:
+				extractor = StreamRecordTimestamp.INSTANCE;
+				break;
+			case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM:
+				Class<TimestampExtractor> clazz = properties.getClass(
+						prefix + ROWTIME_TIMESTAMPS_CLASS, TimestampExtractor.class);
+				extractor = EncodingUtils.decodeStringToObject(
+						properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
+						clazz);
+				break;
+			default:
+				throw new ValidationException("Unsupported rowtime timestamps type: " + t.get());
+		}
+
+		// create watermark strategy
+		WatermarkStrategy strategy;
+		String s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE);
+		switch (s) {
+			case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING:
+				strategy = new AscendingTimestamps();
+				break;
+			case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED:
+				long delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY);
+				strategy = new BoundedOutOfOrderTimestamps(delay);
+				break;
+			case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE:
+				strategy = PreserveWatermarks.INSTANCE;
+				break;
+			case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM:
+				Class<WatermarkStrategy> clazz = properties.getClass(
+						prefix + ROWTIME_WATERMARKS_CLASS, WatermarkStrategy.class);
+				strategy = EncodingUtils.decodeStringToObject(
+						properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED),
+						clazz);
+				break;
+			default:
+				throw new RuntimeException("Unsupported rowtime timestamps type: " + s);
+		}
+
+		return Optional.of(new Tuple2<>(extractor, strategy));
+	}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
new file mode 100644
index 0000000..2ad92c1
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.TableFormatFactory;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Validator for {@link Schema}.
+ */
+@PublicEvolving
+public class SchemaValidator implements DescriptorValidator {
+
+	private final boolean isStreamEnvironment;
+	private final boolean supportsSourceTimestamps;
+	private final boolean supportsSourceWatermarks;
+
+	public SchemaValidator(boolean isStreamEnvironment, boolean supportsSourceTimestamps,
+			boolean supportsSourceWatermarks) {
+		this.isStreamEnvironment = isStreamEnvironment;
+		this.supportsSourceTimestamps = supportsSourceTimestamps;
+		this.supportsSourceWatermarks = supportsSourceWatermarks;
+	}
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+		Map<String, String> types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE);
+
+		if (names.isEmpty() && types.isEmpty()) {
+			throw new ValidationException(
+					format("Could not find the required schema in property '%s'.", SCHEMA));
+		}
+
+		boolean proctimeFound = false;
+
+		for (int i = 0; i < Math.max(names.size(), types.size()); i++) {
+			properties.validateString(SCHEMA + "." + i + "." + SCHEMA_NAME, false, 1);
+			properties.validateType(SCHEMA + "." + i + "." + SCHEMA_TYPE, false, false);
+			properties.validateString(SCHEMA + "." + i + "." + SCHEMA_FROM, true, 1);
+			// either proctime or rowtime
+			String proctime = SCHEMA + "." + i + "." + SCHEMA_PROCTIME;
+			String rowtime = SCHEMA + "." + i + "." + ROWTIME;
+			if (properties.containsKey(proctime)) {
+				// check the environment
+				if (!isStreamEnvironment) {
+					throw new ValidationException(
+							format("Property '%s' is not allowed in a batch environment.", proctime));
+				}
+				// check for only one proctime attribute
+				else if (proctimeFound) {
+					throw new ValidationException("A proctime attribute must only be defined once.");
+				}
+				// check proctime
+				properties.validateBoolean(proctime, false);
+				proctimeFound = properties.getBoolean(proctime);
+				// no rowtime
+				properties.validatePrefixExclusion(rowtime);
+			} else if (properties.hasPrefix(rowtime)) {
+				// check rowtime
+				RowtimeValidator rowtimeValidator = new RowtimeValidator(
+						supportsSourceTimestamps,
+						supportsSourceWatermarks,
+						SCHEMA + "." + i + ".");
+				rowtimeValidator.validate(properties);
+				// no proctime
+				properties.validateExclusion(proctime);
+			}
+		}
+	}
+
+	/**
+	 * Returns keys for a {@link TableFormatFactory#supportedProperties()} method that
+	 * are accepted for schema derivation using {@code deriveFormatFields(DescriptorProperties)}.
+	 */
+	public static List<String> getSchemaDerivationKeys() {
+		List<String> keys = new ArrayList<>();
+
+		// schema
+		keys.add(SCHEMA + ".#." + SCHEMA_TYPE);
+		keys.add(SCHEMA + ".#." + SCHEMA_NAME);
+		keys.add(SCHEMA + ".#." + SCHEMA_FROM);
+
+		// time attributes
+		keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
+		keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
+		keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
+		keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
+		keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
+		keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
+		keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
+		keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
+		keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
+
+		return keys;
+	}
+
+	/**
+	 * Finds the proctime attribute if defined.
+	 */
+	public static Optional<String> deriveProctimeAttribute(DescriptorProperties properties) {
+		Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+		for (int i = 0; i < names.size(); i++) {
+			Optional<Boolean> isProctime = properties.getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME);
+			if (isProctime.isPresent() && isProctime.get()) {
+				return Optional.of(names.get(SCHEMA + "." + i + "." + SCHEMA_NAME));
+			}
+		}
+		return Optional.empty();
+	}
+
+	/**
+	 * Finds the rowtime attributes if defined.
+	 */
+	public static List<RowtimeAttributeDescriptor> deriveRowtimeAttributes(
+			DescriptorProperties properties) {
+
+		Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+		List<RowtimeAttributeDescriptor> attributes = new ArrayList<>();
+
+		// check for rowtime in every field
+		for (int i = 0; i < names.size(); i++) {
+			Optional<Tuple2<TimestampExtractor, WatermarkStrategy>> rowtimeComponents = RowtimeValidator
+					.getRowtimeComponents(properties, SCHEMA + "." + i + ".");
+			int index = i;
+			// create descriptor
+			rowtimeComponents.ifPresent(tuple2 -> attributes.add(new RowtimeAttributeDescriptor(
+					properties.getString(SCHEMA + "." + index + "." + SCHEMA_NAME),
+					tuple2.f0,
+					tuple2.f1))
+			);
+		}
+
+		return attributes;
+	}
+
+	/**
+	 * Derives the table schema for a table sink. A sink ignores a proctime attribute and
+	 * needs to track the origin of a rowtime field.
+	 *
+	 * @deprecated This method combines two separate concepts of table schema and field mapping.
+	 *             This should be split into two methods once we have support for
+	 *             the corresponding interfaces (see FLINK-9870).
+	 */
+	@Deprecated
+	public static TableSchema deriveTableSinkSchema(DescriptorProperties properties) {
+		TableSchema.Builder builder = TableSchema.builder();
+
+		TableSchema schema = properties.getTableSchema(SCHEMA);
+
+		for (int i = 0; i < schema.getFieldCount(); i++) {
+			TypeInformation t = schema.getFieldTypes()[i];
+			String n = schema.getFieldNames()[i];
+			boolean isProctime = properties
+					.getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME)
+					.orElse(false);
+			String tsType = SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE;
+			boolean isRowtime = properties.containsKey(tsType);
+			if (!isProctime && !isRowtime) {
+				// check for a aliasing
+				String fieldName = properties.getOptionalString(SCHEMA + "." + i + "." + SCHEMA_FROM)
+						.orElse(n);
+				builder.field(fieldName, t);
+			}
+			// only use the rowtime attribute if it references a field
+			else if (isRowtime) {
+				switch (properties.getString(tsType)) {
+					case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD:
+						String field = properties.getString(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_FROM);
+						builder.field(field, t);
+						break;
+					// other timestamp strategies require a reverse timestamp extractor to
+					// insert the timestamp into the output
+					default:
+						throw new TableException(format("Unsupported rowtime type '%s' for sink" +
+								" table schema. Currently only '%s' is supported for table sinks.",
+								t, ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD));
+				}
+			}
+		}
+
+		return builder.build();
+	}
+
+	/**
+	 * Finds a table source field mapping.
+	 *
+	 * @param properties The properties describing a schema.
+	 * @param inputType  The input type that a connector and/or format produces. This parameter
+	 *                   can be used to resolve a rowtime field against an input field.
+	 */
+	public static Map<String, String> deriveFieldMapping(
+			DescriptorProperties properties, Optional<TypeInformation<?>> inputType) {
+
+		Map<String, String> mapping = new HashMap<>();
+
+		TableSchema schema = properties.getTableSchema(SCHEMA);
+
+		List<String> columnNames = new ArrayList<>();
+		inputType.ifPresent(t -> columnNames.addAll(Arrays.asList(((CompositeType) t).getFieldNames())));
+
+		// add all source fields first because rowtime might reference one of them
+		columnNames.forEach(name -> mapping.put(name, name));
+
+		// add all schema fields first for implicit mappings
+		Arrays.stream(schema.getFieldNames()).forEach(name -> mapping.put(name, name));
+
+		Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+		for (int i = 0; i < names.size(); i++) {
+			String name = properties.getString(SCHEMA + "." + i + "." + SCHEMA_NAME);
+			Optional<String> source = properties.getOptionalString(SCHEMA + "." + i + "." + SCHEMA_FROM);
+			if (source.isPresent()) {
+				// add explicit mapping
+				mapping.put(name, source.get());
+			} else { // implicit mapping or time
+				boolean isProctime = properties
+						.getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME)
+						.orElse(false);
+				boolean isRowtime = properties
+						.containsKey(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE);
+				// remove proctime/rowtime from mapping
+				if (isProctime || isRowtime) {
+					mapping.remove(name);
+				}
+				// check for invalid fields
+				else if (!columnNames.contains(name)) {
+					throw new ValidationException(format("Could not map the schema field '%s' to a field " +
+							"from source. Please specify the source field from which it can be derived.", name));
+				}
+			}
+		}
+
+		return mapping;
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
deleted file mode 100644
index a3b6e8d..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.Rowtime._
-import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
-import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
-import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
-import org.apache.flink.table.utils.EncodingUtils
-
-import scala.collection.JavaConverters._
-
-/**
-  * Validator for [[Rowtime]].
-  */
-class RowtimeValidator(
-    supportsSourceTimestamps: Boolean,
-    supportsSourceWatermarks: Boolean,
-    prefix: String = "")
-  extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    val timestampExistingField = (_: String) => {
-      properties.validateString(
-        prefix + ROWTIME_TIMESTAMPS_FROM, false, 1)
-    }
-
-    val timestampCustom = (_: String) => {
-      properties.validateString(
-        prefix + ROWTIME_TIMESTAMPS_CLASS, false, 1)
-      properties.validateString(
-        prefix + ROWTIME_TIMESTAMPS_SERIALIZED, false, 1)
-    }
-
-    val timestampsValidation = if (supportsSourceTimestamps) {
-      Map(
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> DescriptorProperties.noValidation(),
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
-    } else {
-      Map(
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
-    }
-
-    properties.validateEnum(
-      prefix + ROWTIME_TIMESTAMPS_TYPE,
-      false,
-      timestampsValidation.asJava
-    )
-
-    val watermarkPeriodicBounded = (_: String) => {
-      properties.validateLong(
-        prefix + ROWTIME_WATERMARKS_DELAY, false, 0)
-    }
-
-    val watermarkCustom = (_: String) => {
-      properties.validateString(
-        prefix + ROWTIME_WATERMARKS_CLASS, false, 1)
-      properties.validateString(
-        prefix + ROWTIME_WATERMARKS_SERIALIZED, false, 1)
-    }
-
-    val watermarksValidation = if (supportsSourceWatermarks) {
-      Map(
-        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> DescriptorProperties.noValidation(),
-        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
-        ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE -> DescriptorProperties.noValidation(),
-        ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
-    } else {
-      Map(
-        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> DescriptorProperties.noValidation(),
-        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
-        ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
-    }
-
-    properties.validateEnum(
-      prefix + ROWTIME_WATERMARKS_TYPE,
-      false,
-      watermarksValidation.asJava
-    )
-  }
-}
-
-object RowtimeValidator {
-
-  // utilities
-
-  def getRowtimeComponents(properties: DescriptorProperties, prefix: String)
-    : Option[(TimestampExtractor, WatermarkStrategy)] = {
-
-    // create timestamp extractor
-    val t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE)
-    if (!t.isPresent) {
-      return None
-    }
-    val extractor: TimestampExtractor = t.get() match {
-
-      case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =>
-        val field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM)
-        new ExistingField(field)
-
-      case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE =>
-        StreamRecordTimestamp.INSTANCE
-
-      case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM =>
-        val clazz = properties.getClass(
-          prefix + ROWTIME_TIMESTAMPS_CLASS,
-          classOf[TimestampExtractor])
-        EncodingUtils.decodeStringToObject(
-          properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
-          clazz)
-    }
-
-    // create watermark strategy
-    val s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE)
-    val strategy: WatermarkStrategy = s match {
-
-      case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING =>
-        new AscendingTimestamps()
-
-      case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED =>
-        val delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY)
-        new BoundedOutOfOrderTimestamps(delay)
-
-      case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE =>
-        PreserveWatermarks.INSTANCE
-
-      case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM =>
-        val clazz = properties.getClass(
-          prefix + ROWTIME_WATERMARKS_CLASS,
-          classOf[WatermarkStrategy])
-        EncodingUtils.decodeStringToObject(
-          properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED),
-          clazz)
-    }
-
-    Some((extractor, strategy))
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
deleted file mode 100644
index f5769bf..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-import java.util.Optional
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
-import org.apache.flink.table.descriptors.Rowtime._
-import org.apache.flink.table.descriptors.Schema._
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor
-import org.apache.flink.table.util.JavaScalaConversionUtil.{toJava, toScala}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
-  * Validator for [[Schema]].
-  */
-class SchemaValidator(
-    isStreamEnvironment: Boolean,
-    supportsSourceTimestamps: Boolean,
-    supportsSourceWatermarks: Boolean)
-  extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-    val types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE)
-
-    if (names.isEmpty && types.isEmpty) {
-      throw new ValidationException(
-        s"Could not find the required schema in property '$SCHEMA'.")
-    }
-
-    var proctimeFound = false
-
-    for (i <- 0 until Math.max(names.size, types.size)) {
-      properties
-        .validateString(s"$SCHEMA.$i.$SCHEMA_NAME", false, 1)
-      properties
-        .validateType(s"$SCHEMA.$i.$SCHEMA_TYPE", false, false)
-      properties
-        .validateString(s"$SCHEMA.$i.$SCHEMA_FROM", true, 1)
-      // either proctime or rowtime
-      val proctime = s"$SCHEMA.$i.$SCHEMA_PROCTIME"
-      val rowtime = s"$SCHEMA.$i.$ROWTIME"
-      if (properties.containsKey(proctime)) {
-        // check the environment
-        if (!isStreamEnvironment) {
-          throw new ValidationException(
-            s"Property '$proctime' is not allowed in a batch environment.")
-        }
-        // check for only one proctime attribute
-        else if (proctimeFound) {
-          throw new ValidationException("A proctime attribute must only be defined once.")
-        }
-        // check proctime
-        properties.validateBoolean(proctime, false)
-        proctimeFound = properties.getBoolean(proctime)
-        // no rowtime
-        properties.validatePrefixExclusion(rowtime)
-      } else if (properties.hasPrefix(rowtime)) {
-        // check rowtime
-        val rowtimeValidator = new RowtimeValidator(
-          supportsSourceTimestamps,
-          supportsSourceWatermarks,
-          s"$SCHEMA.$i.")
-        rowtimeValidator.validate(properties)
-        // no proctime
-        properties.validateExclusion(proctime)
-      }
-    }
-  }
-}
-
-object SchemaValidator {
-
-  /**
-    * Returns keys for a
-    * [[org.apache.flink.table.factories.TableFormatFactory.supportedProperties()]] method that
-    * are accepted for schema derivation using [[deriveFormatFields(DescriptorProperties)]].
-    */
-  def getSchemaDerivationKeys: util.List[String] = {
-    val keys = new util.ArrayList[String]()
-
-    // schema
-    keys.add(SCHEMA + ".#." + SCHEMA_TYPE)
-    keys.add(SCHEMA + ".#." + SCHEMA_NAME)
-    keys.add(SCHEMA + ".#." + SCHEMA_FROM)
-
-    // time attributes
-    keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME)
-    keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE)
-    keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM)
-    keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS)
-    keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED)
-    keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE)
-    keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS)
-    keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED)
-    keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY)
-
-    keys
-  }
-
-  // utilities
-
-  /**
-    * Finds the proctime attribute if defined.
-    */
-  def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = {
-    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
-    for (i <- 0 until names.size) {
-      val isProctime = toScala(
-        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
-      isProctime.foreach { isSet =>
-        if (isSet) {
-          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
-        }
-      }
-    }
-    toJava(None)
-  }
-
-  /**
-    * Finds the rowtime attributes if defined.
-    */
-  def deriveRowtimeAttributes(properties: DescriptorProperties)
-    : util.List[RowtimeAttributeDescriptor] = {
-
-    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
-    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
-
-    // check for rowtime in every field
-    for (i <- 0 until names.size) {
-      RowtimeValidator
-        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
-        .foreach { case (extractor, strategy) =>
-          // create descriptor
-          attributes += new RowtimeAttributeDescriptor(
-            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
-            extractor,
-            strategy)
-        }
-    }
-
-    attributes.asJava
-  }
-
-  /**
-    * Derives the table schema for a table sink. A sink ignores a proctime attribute and
-    * needs to track the origin of a rowtime field.
-    *
-    * @deprecated This method combines two separate concepts of table schema and field mapping.
-    *             This should be split into two methods once we have support for
-    *             the corresponding interfaces (see FLINK-9870).
-    */
-  @deprecated
-  def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = {
-    val builder = TableSchema.builder()
-
-    val schema = properties.getTableSchema(SCHEMA)
-
-    schema.getFieldNames.zip(schema.getFieldTypes).zipWithIndex.foreach { case ((n, t), i) =>
-      val isProctime = properties
-        .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
-        .orElse(false)
-      val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
-      val isRowtime = properties.containsKey(tsType)
-      if (!isProctime && !isRowtime) {
-        // check for a aliasing
-        val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
-          .orElse(n)
-        builder.field(fieldName, t)
-      }
-      // only use the rowtime attribute if it references a field
-      else if (isRowtime) {
-        properties.getString(tsType) match {
-          case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =>
-            val field = properties.getString(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_FROM")
-            builder.field(field, t)
-
-          // other timestamp strategies require a reverse timestamp extractor to
-          // insert the timestamp into the output
-          case t@_ =>
-            throw new TableException(
-              s"Unsupported rowtime type '$t' for sink table schema. Currently " +
-              s"only '$ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD' is supported for table sinks.")
-        }
-      }
-    }
-
-    builder.build()
-  }
-
-  /**
-    * Finds a table source field mapping.
-    *
-    * @param properties The properties describing a schema.
-    * @param inputType  The input type that a connector and/or format produces. This parameter
-    *                   can be used to resolve a rowtime field against an input field.
-    */
-  def deriveFieldMapping(
-      properties: DescriptorProperties,
-      inputType: Optional[TypeInformation[_]])
-    : util.Map[String, String] = {
-
-    val mapping = mutable.Map[String, String]()
-
-    val schema = properties.getTableSchema(SCHEMA)
-
-    val columnNames = toScala(inputType) match {
-      case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq
-      case _ => Seq[String]()
-    }
-
-    // add all source fields first because rowtime might reference one of them
-    columnNames.foreach(name => mapping.put(name, name))
-
-    // add all schema fields first for implicit mappings
-    schema.getFieldNames.foreach { name =>
-      mapping.put(name, name)
-    }
-
-    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
-    for (i <- 0 until names.size) {
-      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
-      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match {
-
-        // add explicit mapping
-        case Some(source) =>
-          mapping.put(name, source)
-
-        // implicit mapping or time
-        case None =>
-          val isProctime = properties
-            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
-            .orElse(false)
-          val isRowtime = properties
-            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
-          // remove proctime/rowtime from mapping
-          if (isProctime || isRowtime) {
-            mapping.remove(name)
-          }
-          // check for invalid fields
-          else if (!columnNames.contains(name)) {
-            throw new ValidationException(s"Could not map the schema field '$name' to a field " +
-              s"from source. Please specify the source field from which it can be derived.")
-          }
-      }
-    }
-
-    mapping.toMap.asJava
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
index 1365eae..3aa114e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
@@ -70,10 +70,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory {
     // validate
     new FileSystemValidator().validate(params)
     new OldCsvValidator().validate(params)
-    new SchemaValidator(
-      isStreaming,
-      supportsSourceTimestamps = false,
-      supportsSourceWatermarks = false).validate(params)
+    new SchemaValidator(isStreaming, false, false).validate(params)
 
     // build
     val formatSchema = params.getTableSchema(FORMAT_FIELDS)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
index 41c9698..a7a48bb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
@@ -75,10 +75,7 @@ abstract class CsvTableSourceFactoryBase extends TableFactory {
     // validate
     new FileSystemValidator().validate(params)
     new OldCsvValidator().validate(params)
-    new SchemaValidator(
-      isStreaming,
-      supportsSourceTimestamps = false,
-      supportsSourceWatermarks = false).validate(params)
+    new SchemaValidator(isStreaming, false, false).validate(params)
 
     // build
     val csvTableSourceBuilder = new CsvTableSource.Builder
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
index 0f51c76..8bd327c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -71,7 +71,7 @@ class RowtimeTest extends DescriptorTestBase {
   }
 
   override def validator(): DescriptorValidator = {
-    new RowtimeValidator(supportsSourceTimestamps = true, supportsSourceWatermarks = false)
+    new RowtimeValidator(true, false)
   }
 
   override def properties(): util.List[util.Map[String, String]] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
index 8bf9709..3ab1a67 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
@@ -61,10 +61,7 @@ class SchemaTest extends DescriptorTestBase {
   }
 
   override def validator(): DescriptorValidator = {
-    new SchemaValidator(
-      isStreamEnvironment = true,
-      supportsSourceTimestamps = true,
-      supportsSourceWatermarks = true)
+    new SchemaValidator(true, true, true)
   }
 
   override def properties(): util.List[util.Map[String, String]] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 28f68d5..6de7525 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -51,10 +51,7 @@ class InMemoryTableFactory(terminationCount: Int)
     params.putProperties(properties)
 
     // validate
-    new SchemaValidator(
-      isStreamEnvironment = true,
-      supportsSourceTimestamps = true,
-      supportsSourceWatermarks = true).validate(params)
+    new SchemaValidator(true, true, true).validate(params)
 
     val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
 
@@ -71,10 +68,7 @@ class InMemoryTableFactory(terminationCount: Int)
     params.putProperties(properties)
 
     // validate
-    new SchemaValidator(
-      isStreamEnvironment = true,
-      supportsSourceTimestamps = true,
-      supportsSourceWatermarks = true).validate(params)
+    new SchemaValidator(true, true, true).validate(params)
 
     val tableSchema = params.getTableSchema(SCHEMA)
 


[flink] 02/02: [FLINK-13286][table-common] Port FileSystem and FileSystemValidator to common

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0b53eef53271e1f73cfe3d7e0b4fe93fb2b74b5f
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jul 18 20:59:40 2019 +0800

    [FLINK-13286][table-common] Port FileSystem and FileSystemValidator to common
---
 .../apache/flink/table/descriptors/FileSystem.java | 58 ++++++++++++++++++++
 .../table/descriptors/FileSystemValidator.java}    | 31 +++++------
 .../flink/table/descriptors/FileSystem.scala       | 64 ----------------------
 .../flink/table/descriptors/FileSystemTest.scala   |  2 +-
 .../table/descriptors/TableDescriptorTest.scala    |  2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  6 +-
 6 files changed, 77 insertions(+), 86 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java
new file mode 100644
index 0000000..291c0f9
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystem.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_PATH;
+import static org.apache.flink.table.descriptors.FileSystemValidator.CONNECTOR_TYPE_VALUE;
+
+/**
+ * Connector descriptor for a file system.
+ */
+@PublicEvolving
+public class FileSystem extends ConnectorDescriptor {
+
+	private String path = null;
+
+	public FileSystem() {
+		super(CONNECTOR_TYPE_VALUE, 1, true);
+	}
+
+	/**
+	 * Sets the path to a file or directory in a file system.
+	 *
+	 * @param path the path a file or directory
+	 */
+	public FileSystem path(String path) {
+		this.path = path;
+		return this;
+	}
+
+	@Override
+	protected Map<String, String> toConnectorProperties() {
+		DescriptorProperties properties = new DescriptorProperties();
+		if (path != null) {
+			properties.putString(CONNECTOR_PATH, path);
+		}
+		return properties.asMap();
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java
similarity index 54%
rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java
index 4d1b7de..d0e2e1b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystemValidator.scala
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/FileSystemValidator.java
@@ -16,26 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.descriptors
+package org.apache.flink.table.descriptors;
 
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE
-import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
+import org.apache.flink.annotation.PublicEvolving;
 
 /**
-  * Validator for [[FileSystem]].
-  */
-class FileSystemValidator extends ConnectorDescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    super.validate(properties)
-    properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false)
-    properties.validateString(CONNECTOR_PATH, false, 1)
-  }
-}
-
-object FileSystemValidator {
+ * Validator for {@link FileSystem}.
+ */
+@PublicEvolving
+public class FileSystemValidator extends ConnectorDescriptorValidator {
 
-  val CONNECTOR_TYPE_VALUE = "filesystem"
-  val CONNECTOR_PATH = "connector.path"
+	public static final String CONNECTOR_TYPE_VALUE = "filesystem";
+	public static final String CONNECTOR_PATH = "connector.path";
 
+	@Override
+	public void validate(DescriptorProperties properties) {
+		super.validate(properties);
+		properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE, false);
+		properties.validateString(CONNECTOR_PATH, false, 1);
+	}
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
deleted file mode 100644
index 77cf27b..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/FileSystem.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.table.descriptors.FileSystemValidator.{CONNECTOR_PATH, CONNECTOR_TYPE_VALUE}
-
-/**
-  * Connector descriptor for a file system.
-  */
-class FileSystem extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, 1, true) {
-
-  private var path: Option[String] = None
-
-  /**
-    * Sets the path to a file or directory in a file system.
-    *
-    * @param path the path a file or directory
-    */
-  def path(path: String): FileSystem = {
-    this.path = Some(path)
-    this
-  }
-
-  override protected def toConnectorProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-
-    path.foreach(properties.putString(CONNECTOR_PATH, _))
-
-    properties.asMap()
-  }
-}
-
-/**
-  * Connector descriptor for a file system.
-  */
-object FileSystem {
-
-  /**
-    * Connector descriptor for a file system.
-    *
-    * @deprecated Use `new FileSystem()`.
-    */
-  @deprecated
-  def apply(): FileSystem = new FileSystem()
-  
-}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
index 1162694..d232a0d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/FileSystemTest.scala
@@ -40,7 +40,7 @@ class FileSystemTest extends DescriptorTestBase {
   // ----------------------------------------------------------------------------------------------
 
   override def descriptors(): util.List[Descriptor] = {
-    util.Arrays.asList(FileSystem().path("/myfile"))
+    util.Arrays.asList(new FileSystem().path("/myfile"))
   }
 
   override def validator(): DescriptorValidator = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
index df4d3fc..555a030 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/TableDescriptorTest.scala
@@ -56,7 +56,7 @@ class TableDescriptorTest extends TableTestBase {
     //  schema.field("proctime", Types.SQL_TIMESTAMP).proctime()
     //}
 
-    val connector = FileSystem()
+    val connector = new FileSystem()
       .path("/path/to/csv")
 
     val format = OldCsv()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index 0d633fe..b5ada5d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -67,7 +67,7 @@ object CommonTestData {
     )
     val tempFilePath1 = writeToTempFile(csvRecord1.mkString("\n"), "csv-test1", "tmp")
 
-    val connDesc1 = FileSystem().path(tempFilePath1)
+    val connDesc1 = new FileSystem().path(tempFilePath1)
     val formatDesc1 = OldCsv()
       .field("a", Types.INT)
       .field("b", Types.LONG)
@@ -106,7 +106,7 @@ object CommonTestData {
     )
     val tempFilePath2 = writeToTempFile(csvRecord2.mkString("\n"), "csv-test2", "tmp")
 
-    val connDesc2 = FileSystem().path(tempFilePath2)
+    val connDesc2 = new FileSystem().path(tempFilePath2)
     val formatDesc2 = OldCsv()
       .field("d", Types.INT)
       .field("e", Types.LONG)
@@ -131,7 +131,7 @@ object CommonTestData {
     }
 
     val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
-    val connDesc3 = FileSystem().path(tempFilePath3)
+    val connDesc3 = new FileSystem().path(tempFilePath3)
     val formatDesc3 = OldCsv()
       .field("x", Types.INT)
       .field("y", Types.LONG)