You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/19 13:35:49 UTC

[flink] 01/02: [FLINK-13286][table-api] Port RowtimeValidator and SchemaValidator to api-java-bridge

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 86ece85369ad3e53e23dda57a0e95483942a30b0
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Jul 18 20:57:12 2019 +0800

    [FLINK-13286][table-api] Port RowtimeValidator and SchemaValidator to api-java-bridge
    
    This closes #9168
---
 .../flink/table/descriptors/RowtimeValidator.java  | 176 +++++++++++++
 .../flink/table/descriptors/SchemaValidator.java   | 287 +++++++++++++++++++++
 .../flink/table/descriptors/RowtimeValidator.scala | 156 -----------
 .../flink/table/descriptors/SchemaValidator.scala  | 275 --------------------
 .../table/sinks/CsvTableSinkFactoryBase.scala      |   5 +-
 .../table/sources/CsvTableSourceFactoryBase.scala  |   5 +-
 .../flink/table/descriptors/RowtimeTest.scala      |   2 +-
 .../flink/table/descriptors/SchemaTest.scala       |   5 +-
 .../flink/table/utils/InMemoryTableFactory.scala   |  10 +-
 9 files changed, 469 insertions(+), 452 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java
new file mode 100644
index 0000000..9061a02
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/RowtimeValidator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
+import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps;
+import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks;
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED;
+
+/**
+ * Validator for {@link Rowtime}.
+ */
+@PublicEvolving
+public class RowtimeValidator implements DescriptorValidator {
+
+	private final boolean supportsSourceTimestamps;
+	private final boolean supportsSourceWatermarks;
+	private final String prefix;
+
+	public RowtimeValidator(boolean supportsSourceTimestamps, boolean supportsSourceWatermarks) {
+		this(supportsSourceTimestamps, supportsSourceWatermarks, "");
+	}
+
+	public RowtimeValidator(boolean supportsSourceTimestamps, boolean supportsSourceWatermarks, String prefix) {
+		this.supportsSourceTimestamps = supportsSourceTimestamps;
+		this.supportsSourceWatermarks = supportsSourceWatermarks;
+		this.prefix = prefix;
+	}
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		Consumer<String> timestampExistingField =
+				s -> properties.validateString(prefix + ROWTIME_TIMESTAMPS_FROM, false, 1);
+
+		Consumer<String> timestampCustom = s -> {
+			properties.validateString(prefix + ROWTIME_TIMESTAMPS_CLASS, false, 1);
+			properties.validateString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED, false, 1);
+		};
+
+		Map<String, Consumer<String>> timestampsValidation = new HashMap<>();
+		if (supportsSourceTimestamps) {
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, timestampExistingField);
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE, DescriptorProperties.noValidation());
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, timestampCustom);
+		} else {
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD, timestampExistingField);
+			timestampsValidation.put(ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM, timestampCustom);
+		}
+
+		properties.validateEnum(prefix + ROWTIME_TIMESTAMPS_TYPE, false, timestampsValidation);
+
+		Consumer<String> watermarkPeriodicBounded =
+				s -> properties.validateLong(prefix + ROWTIME_WATERMARKS_DELAY, false, 0);
+
+		Consumer<String> watermarkCustom = s -> {
+			properties.validateString(prefix + ROWTIME_WATERMARKS_CLASS, false, 1);
+			properties.validateString(prefix + ROWTIME_WATERMARKS_SERIALIZED, false, 1);
+		};
+
+		Map<String, Consumer<String>> watermarksValidation = new HashMap<>();
+		if (supportsSourceWatermarks) {
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING, DescriptorProperties.noValidation());
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, watermarkPeriodicBounded);
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE, DescriptorProperties.noValidation());
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, watermarkCustom);
+		} else {
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING, DescriptorProperties.noValidation());
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED, watermarkPeriodicBounded);
+			watermarksValidation.put(ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM, watermarkCustom);
+		}
+
+		properties.validateEnum(prefix + ROWTIME_WATERMARKS_TYPE, false, watermarksValidation);
+	}
+
+	// utilities
+
+	public static Optional<Tuple2<TimestampExtractor, WatermarkStrategy>> getRowtimeComponents(
+			DescriptorProperties properties, String prefix) {
+		// create timestamp extractor
+		TimestampExtractor extractor;
+		Optional<String> t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE);
+		if (!t.isPresent()) {
+			return Optional.empty();
+		}
+
+		switch (t.get()) {
+			case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD:
+				String field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM);
+				extractor = new ExistingField(field);
+				break;
+			case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE:
+				extractor = StreamRecordTimestamp.INSTANCE;
+				break;
+			case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM:
+				Class<TimestampExtractor> clazz = properties.getClass(
+						prefix + ROWTIME_TIMESTAMPS_CLASS, TimestampExtractor.class);
+				extractor = EncodingUtils.decodeStringToObject(
+						properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
+						clazz);
+				break;
+			default:
+				throw new ValidationException("Unsupported rowtime timestamps type: " + t.get());
+		}
+
+		// create watermark strategy
+		WatermarkStrategy strategy;
+		String s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE);
+		switch (s) {
+			case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING:
+				strategy = new AscendingTimestamps();
+				break;
+			case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED:
+				long delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY);
+				strategy = new BoundedOutOfOrderTimestamps(delay);
+				break;
+			case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE:
+				strategy = PreserveWatermarks.INSTANCE;
+				break;
+			case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM:
+				Class<WatermarkStrategy> clazz = properties.getClass(
+						prefix + ROWTIME_WATERMARKS_CLASS, WatermarkStrategy.class);
+				strategy = EncodingUtils.decodeStringToObject(
+						properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED),
+						clazz);
+				break;
+			default:
+				throw new RuntimeException("Unsupported rowtime timestamps type: " + s);
+		}
+
+		return Optional.of(new Tuple2<>(extractor, strategy));
+	}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
new file mode 100644
index 0000000..2ad92c1
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/SchemaValidator.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.TableFormatFactory;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
+import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Validator for {@link Schema}.
+ */
+@PublicEvolving
+public class SchemaValidator implements DescriptorValidator {
+
+	private final boolean isStreamEnvironment;
+	private final boolean supportsSourceTimestamps;
+	private final boolean supportsSourceWatermarks;
+
+	public SchemaValidator(boolean isStreamEnvironment, boolean supportsSourceTimestamps,
+			boolean supportsSourceWatermarks) {
+		this.isStreamEnvironment = isStreamEnvironment;
+		this.supportsSourceTimestamps = supportsSourceTimestamps;
+		this.supportsSourceWatermarks = supportsSourceWatermarks;
+	}
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+		Map<String, String> types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE);
+
+		if (names.isEmpty() && types.isEmpty()) {
+			throw new ValidationException(
+					format("Could not find the required schema in property '%s'.", SCHEMA));
+		}
+
+		boolean proctimeFound = false;
+
+		for (int i = 0; i < Math.max(names.size(), types.size()); i++) {
+			properties.validateString(SCHEMA + "." + i + "." + SCHEMA_NAME, false, 1);
+			properties.validateType(SCHEMA + "." + i + "." + SCHEMA_TYPE, false, false);
+			properties.validateString(SCHEMA + "." + i + "." + SCHEMA_FROM, true, 1);
+			// either proctime or rowtime
+			String proctime = SCHEMA + "." + i + "." + SCHEMA_PROCTIME;
+			String rowtime = SCHEMA + "." + i + "." + ROWTIME;
+			if (properties.containsKey(proctime)) {
+				// check the environment
+				if (!isStreamEnvironment) {
+					throw new ValidationException(
+							format("Property '%s' is not allowed in a batch environment.", proctime));
+				}
+				// check for only one proctime attribute
+				else if (proctimeFound) {
+					throw new ValidationException("A proctime attribute must only be defined once.");
+				}
+				// check proctime
+				properties.validateBoolean(proctime, false);
+				proctimeFound = properties.getBoolean(proctime);
+				// no rowtime
+				properties.validatePrefixExclusion(rowtime);
+			} else if (properties.hasPrefix(rowtime)) {
+				// check rowtime
+				RowtimeValidator rowtimeValidator = new RowtimeValidator(
+						supportsSourceTimestamps,
+						supportsSourceWatermarks,
+						SCHEMA + "." + i + ".");
+				rowtimeValidator.validate(properties);
+				// no proctime
+				properties.validateExclusion(proctime);
+			}
+		}
+	}
+
+	/**
+	 * Returns keys for a {@link TableFormatFactory#supportedProperties()} method that
+	 * are accepted for schema derivation using {@code deriveFormatFields(DescriptorProperties)}.
+	 */
+	public static List<String> getSchemaDerivationKeys() {
+		List<String> keys = new ArrayList<>();
+
+		// schema
+		keys.add(SCHEMA + ".#." + SCHEMA_TYPE);
+		keys.add(SCHEMA + ".#." + SCHEMA_NAME);
+		keys.add(SCHEMA + ".#." + SCHEMA_FROM);
+
+		// time attributes
+		keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
+		keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
+		keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
+		keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
+		keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
+		keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
+		keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
+		keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
+		keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
+
+		return keys;
+	}
+
+	/**
+	 * Finds the proctime attribute if defined.
+	 */
+	public static Optional<String> deriveProctimeAttribute(DescriptorProperties properties) {
+		Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+		for (int i = 0; i < names.size(); i++) {
+			Optional<Boolean> isProctime = properties.getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME);
+			if (isProctime.isPresent() && isProctime.get()) {
+				return Optional.of(names.get(SCHEMA + "." + i + "." + SCHEMA_NAME));
+			}
+		}
+		return Optional.empty();
+	}
+
+	/**
+	 * Finds the rowtime attributes if defined.
+	 */
+	public static List<RowtimeAttributeDescriptor> deriveRowtimeAttributes(
+			DescriptorProperties properties) {
+
+		Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+		List<RowtimeAttributeDescriptor> attributes = new ArrayList<>();
+
+		// check for rowtime in every field
+		for (int i = 0; i < names.size(); i++) {
+			Optional<Tuple2<TimestampExtractor, WatermarkStrategy>> rowtimeComponents = RowtimeValidator
+					.getRowtimeComponents(properties, SCHEMA + "." + i + ".");
+			int index = i;
+			// create descriptor
+			rowtimeComponents.ifPresent(tuple2 -> attributes.add(new RowtimeAttributeDescriptor(
+					properties.getString(SCHEMA + "." + index + "." + SCHEMA_NAME),
+					tuple2.f0,
+					tuple2.f1))
+			);
+		}
+
+		return attributes;
+	}
+
+	/**
+	 * Derives the table schema for a table sink. A sink ignores a proctime attribute and
+	 * needs to track the origin of a rowtime field.
+	 *
+	 * @deprecated This method combines two separate concepts of table schema and field mapping.
+	 *             This should be split into two methods once we have support for
+	 *             the corresponding interfaces (see FLINK-9870).
+	 */
+	@Deprecated
+	public static TableSchema deriveTableSinkSchema(DescriptorProperties properties) {
+		TableSchema.Builder builder = TableSchema.builder();
+
+		TableSchema schema = properties.getTableSchema(SCHEMA);
+
+		for (int i = 0; i < schema.getFieldCount(); i++) {
+			TypeInformation t = schema.getFieldTypes()[i];
+			String n = schema.getFieldNames()[i];
+			boolean isProctime = properties
+					.getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME)
+					.orElse(false);
+			String tsType = SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE;
+			boolean isRowtime = properties.containsKey(tsType);
+			if (!isProctime && !isRowtime) {
+				// check for a aliasing
+				String fieldName = properties.getOptionalString(SCHEMA + "." + i + "." + SCHEMA_FROM)
+						.orElse(n);
+				builder.field(fieldName, t);
+			}
+			// only use the rowtime attribute if it references a field
+			else if (isRowtime) {
+				switch (properties.getString(tsType)) {
+					case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD:
+						String field = properties.getString(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_FROM);
+						builder.field(field, t);
+						break;
+					// other timestamp strategies require a reverse timestamp extractor to
+					// insert the timestamp into the output
+					default:
+						throw new TableException(format("Unsupported rowtime type '%s' for sink" +
+								" table schema. Currently only '%s' is supported for table sinks.",
+								t, ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD));
+				}
+			}
+		}
+
+		return builder.build();
+	}
+
+	/**
+	 * Finds a table source field mapping.
+	 *
+	 * @param properties The properties describing a schema.
+	 * @param inputType  The input type that a connector and/or format produces. This parameter
+	 *                   can be used to resolve a rowtime field against an input field.
+	 */
+	public static Map<String, String> deriveFieldMapping(
+			DescriptorProperties properties, Optional<TypeInformation<?>> inputType) {
+
+		Map<String, String> mapping = new HashMap<>();
+
+		TableSchema schema = properties.getTableSchema(SCHEMA);
+
+		List<String> columnNames = new ArrayList<>();
+		inputType.ifPresent(t -> columnNames.addAll(Arrays.asList(((CompositeType) t).getFieldNames())));
+
+		// add all source fields first because rowtime might reference one of them
+		columnNames.forEach(name -> mapping.put(name, name));
+
+		// add all schema fields first for implicit mappings
+		Arrays.stream(schema.getFieldNames()).forEach(name -> mapping.put(name, name));
+
+		Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
+
+		for (int i = 0; i < names.size(); i++) {
+			String name = properties.getString(SCHEMA + "." + i + "." + SCHEMA_NAME);
+			Optional<String> source = properties.getOptionalString(SCHEMA + "." + i + "." + SCHEMA_FROM);
+			if (source.isPresent()) {
+				// add explicit mapping
+				mapping.put(name, source.get());
+			} else { // implicit mapping or time
+				boolean isProctime = properties
+						.getOptionalBoolean(SCHEMA + "." + i + "." + SCHEMA_PROCTIME)
+						.orElse(false);
+				boolean isRowtime = properties
+						.containsKey(SCHEMA + "." + i + "." + ROWTIME_TIMESTAMPS_TYPE);
+				// remove proctime/rowtime from mapping
+				if (isProctime || isRowtime) {
+					mapping.remove(name);
+				}
+				// check for invalid fields
+				else if (!columnNames.contains(name)) {
+					throw new ValidationException(format("Could not map the schema field '%s' to a field " +
+							"from source. Please specify the source field from which it can be derived.", name));
+				}
+			}
+		}
+
+		return mapping;
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
deleted file mode 100644
index a3b6e8d..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.Rowtime._
-import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
-import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
-import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
-import org.apache.flink.table.utils.EncodingUtils
-
-import scala.collection.JavaConverters._
-
-/**
-  * Validator for [[Rowtime]].
-  */
-class RowtimeValidator(
-    supportsSourceTimestamps: Boolean,
-    supportsSourceWatermarks: Boolean,
-    prefix: String = "")
-  extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    val timestampExistingField = (_: String) => {
-      properties.validateString(
-        prefix + ROWTIME_TIMESTAMPS_FROM, false, 1)
-    }
-
-    val timestampCustom = (_: String) => {
-      properties.validateString(
-        prefix + ROWTIME_TIMESTAMPS_CLASS, false, 1)
-      properties.validateString(
-        prefix + ROWTIME_TIMESTAMPS_SERIALIZED, false, 1)
-    }
-
-    val timestampsValidation = if (supportsSourceTimestamps) {
-      Map(
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE -> DescriptorProperties.noValidation(),
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
-    } else {
-      Map(
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD -> toJava(timestampExistingField),
-        ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM -> toJava(timestampCustom))
-    }
-
-    properties.validateEnum(
-      prefix + ROWTIME_TIMESTAMPS_TYPE,
-      false,
-      timestampsValidation.asJava
-    )
-
-    val watermarkPeriodicBounded = (_: String) => {
-      properties.validateLong(
-        prefix + ROWTIME_WATERMARKS_DELAY, false, 0)
-    }
-
-    val watermarkCustom = (_: String) => {
-      properties.validateString(
-        prefix + ROWTIME_WATERMARKS_CLASS, false, 1)
-      properties.validateString(
-        prefix + ROWTIME_WATERMARKS_SERIALIZED, false, 1)
-    }
-
-    val watermarksValidation = if (supportsSourceWatermarks) {
-      Map(
-        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> DescriptorProperties.noValidation(),
-        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
-        ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE -> DescriptorProperties.noValidation(),
-        ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
-    } else {
-      Map(
-        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING -> DescriptorProperties.noValidation(),
-        ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED -> toJava(watermarkPeriodicBounded),
-        ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM -> toJava(watermarkCustom))
-    }
-
-    properties.validateEnum(
-      prefix + ROWTIME_WATERMARKS_TYPE,
-      false,
-      watermarksValidation.asJava
-    )
-  }
-}
-
-object RowtimeValidator {
-
-  // utilities
-
-  def getRowtimeComponents(properties: DescriptorProperties, prefix: String)
-    : Option[(TimestampExtractor, WatermarkStrategy)] = {
-
-    // create timestamp extractor
-    val t = properties.getOptionalString(prefix + ROWTIME_TIMESTAMPS_TYPE)
-    if (!t.isPresent) {
-      return None
-    }
-    val extractor: TimestampExtractor = t.get() match {
-
-      case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =>
-        val field = properties.getString(prefix + ROWTIME_TIMESTAMPS_FROM)
-        new ExistingField(field)
-
-      case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE =>
-        StreamRecordTimestamp.INSTANCE
-
-      case ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM =>
-        val clazz = properties.getClass(
-          prefix + ROWTIME_TIMESTAMPS_CLASS,
-          classOf[TimestampExtractor])
-        EncodingUtils.decodeStringToObject(
-          properties.getString(prefix + ROWTIME_TIMESTAMPS_SERIALIZED),
-          clazz)
-    }
-
-    // create watermark strategy
-    val s = properties.getString(prefix + ROWTIME_WATERMARKS_TYPE)
-    val strategy: WatermarkStrategy = s match {
-
-      case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING =>
-        new AscendingTimestamps()
-
-      case ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDED =>
-        val delay = properties.getLong(prefix + ROWTIME_WATERMARKS_DELAY)
-        new BoundedOutOfOrderTimestamps(delay)
-
-      case ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE =>
-        PreserveWatermarks.INSTANCE
-
-      case ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM =>
-        val clazz = properties.getClass(
-          prefix + ROWTIME_WATERMARKS_CLASS,
-          classOf[WatermarkStrategy])
-        EncodingUtils.decodeStringToObject(
-          properties.getString(prefix + ROWTIME_WATERMARKS_SERIALIZED),
-          clazz)
-    }
-
-    Some((extractor, strategy))
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
deleted file mode 100644
index f5769bf..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-import java.util.Optional
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.table.api.{TableException, TableSchema, ValidationException}
-import org.apache.flink.table.descriptors.Rowtime._
-import org.apache.flink.table.descriptors.Schema._
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor
-import org.apache.flink.table.util.JavaScalaConversionUtil.{toJava, toScala}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
-  * Validator for [[Schema]].
-  */
-class SchemaValidator(
-    isStreamEnvironment: Boolean,
-    supportsSourceTimestamps: Boolean,
-    supportsSourceWatermarks: Boolean)
-  extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-    val types = properties.getIndexedProperty(SCHEMA, SCHEMA_TYPE)
-
-    if (names.isEmpty && types.isEmpty) {
-      throw new ValidationException(
-        s"Could not find the required schema in property '$SCHEMA'.")
-    }
-
-    var proctimeFound = false
-
-    for (i <- 0 until Math.max(names.size, types.size)) {
-      properties
-        .validateString(s"$SCHEMA.$i.$SCHEMA_NAME", false, 1)
-      properties
-        .validateType(s"$SCHEMA.$i.$SCHEMA_TYPE", false, false)
-      properties
-        .validateString(s"$SCHEMA.$i.$SCHEMA_FROM", true, 1)
-      // either proctime or rowtime
-      val proctime = s"$SCHEMA.$i.$SCHEMA_PROCTIME"
-      val rowtime = s"$SCHEMA.$i.$ROWTIME"
-      if (properties.containsKey(proctime)) {
-        // check the environment
-        if (!isStreamEnvironment) {
-          throw new ValidationException(
-            s"Property '$proctime' is not allowed in a batch environment.")
-        }
-        // check for only one proctime attribute
-        else if (proctimeFound) {
-          throw new ValidationException("A proctime attribute must only be defined once.")
-        }
-        // check proctime
-        properties.validateBoolean(proctime, false)
-        proctimeFound = properties.getBoolean(proctime)
-        // no rowtime
-        properties.validatePrefixExclusion(rowtime)
-      } else if (properties.hasPrefix(rowtime)) {
-        // check rowtime
-        val rowtimeValidator = new RowtimeValidator(
-          supportsSourceTimestamps,
-          supportsSourceWatermarks,
-          s"$SCHEMA.$i.")
-        rowtimeValidator.validate(properties)
-        // no proctime
-        properties.validateExclusion(proctime)
-      }
-    }
-  }
-}
-
-object SchemaValidator {
-
-  /**
-    * Returns keys for a
-    * [[org.apache.flink.table.factories.TableFormatFactory.supportedProperties()]] method that
-    * are accepted for schema derivation using [[deriveFormatFields(DescriptorProperties)]].
-    */
-  def getSchemaDerivationKeys: util.List[String] = {
-    val keys = new util.ArrayList[String]()
-
-    // schema
-    keys.add(SCHEMA + ".#." + SCHEMA_TYPE)
-    keys.add(SCHEMA + ".#." + SCHEMA_NAME)
-    keys.add(SCHEMA + ".#." + SCHEMA_FROM)
-
-    // time attributes
-    keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME)
-    keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE)
-    keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM)
-    keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS)
-    keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED)
-    keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE)
-    keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS)
-    keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED)
-    keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY)
-
-    keys
-  }
-
-  // utilities
-
-  /**
-    * Finds the proctime attribute if defined.
-    */
-  def deriveProctimeAttribute(properties: DescriptorProperties): Optional[String] = {
-    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
-    for (i <- 0 until names.size) {
-      val isProctime = toScala(
-        properties.getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME"))
-      isProctime.foreach { isSet =>
-        if (isSet) {
-          return toJava(names.asScala.get(s"$SCHEMA.$i.$SCHEMA_NAME"))
-        }
-      }
-    }
-    toJava(None)
-  }
-
-  /**
-    * Finds the rowtime attributes if defined.
-    */
-  def deriveRowtimeAttributes(properties: DescriptorProperties)
-    : util.List[RowtimeAttributeDescriptor] = {
-
-    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
-    var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
-
-    // check for rowtime in every field
-    for (i <- 0 until names.size) {
-      RowtimeValidator
-        .getRowtimeComponents(properties, s"$SCHEMA.$i.")
-        .foreach { case (extractor, strategy) =>
-          // create descriptor
-          attributes += new RowtimeAttributeDescriptor(
-            properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME"),
-            extractor,
-            strategy)
-        }
-    }
-
-    attributes.asJava
-  }
-
-  /**
-    * Derives the table schema for a table sink. A sink ignores a proctime attribute and
-    * needs to track the origin of a rowtime field.
-    *
-    * @deprecated This method combines two separate concepts of table schema and field mapping.
-    *             This should be split into two methods once we have support for
-    *             the corresponding interfaces (see FLINK-9870).
-    */
-  @deprecated
-  def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = {
-    val builder = TableSchema.builder()
-
-    val schema = properties.getTableSchema(SCHEMA)
-
-    schema.getFieldNames.zip(schema.getFieldTypes).zipWithIndex.foreach { case ((n, t), i) =>
-      val isProctime = properties
-        .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
-        .orElse(false)
-      val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
-      val isRowtime = properties.containsKey(tsType)
-      if (!isProctime && !isRowtime) {
-        // check for a aliasing
-        val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
-          .orElse(n)
-        builder.field(fieldName, t)
-      }
-      // only use the rowtime attribute if it references a field
-      else if (isRowtime) {
-        properties.getString(tsType) match {
-          case ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD =>
-            val field = properties.getString(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_FROM")
-            builder.field(field, t)
-
-          // other timestamp strategies require a reverse timestamp extractor to
-          // insert the timestamp into the output
-          case t@_ =>
-            throw new TableException(
-              s"Unsupported rowtime type '$t' for sink table schema. Currently " +
-              s"only '$ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD' is supported for table sinks.")
-        }
-      }
-    }
-
-    builder.build()
-  }
-
-  /**
-    * Finds a table source field mapping.
-    *
-    * @param properties The properties describing a schema.
-    * @param inputType  The input type that a connector and/or format produces. This parameter
-    *                   can be used to resolve a rowtime field against an input field.
-    */
-  def deriveFieldMapping(
-      properties: DescriptorProperties,
-      inputType: Optional[TypeInformation[_]])
-    : util.Map[String, String] = {
-
-    val mapping = mutable.Map[String, String]()
-
-    val schema = properties.getTableSchema(SCHEMA)
-
-    val columnNames = toScala(inputType) match {
-      case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq
-      case _ => Seq[String]()
-    }
-
-    // add all source fields first because rowtime might reference one of them
-    columnNames.foreach(name => mapping.put(name, name))
-
-    // add all schema fields first for implicit mappings
-    schema.getFieldNames.foreach { name =>
-      mapping.put(name, name)
-    }
-
-    val names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME)
-
-    for (i <- 0 until names.size) {
-      val name = properties.getString(s"$SCHEMA.$i.$SCHEMA_NAME")
-      toScala(properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")) match {
-
-        // add explicit mapping
-        case Some(source) =>
-          mapping.put(name, source)
-
-        // implicit mapping or time
-        case None =>
-          val isProctime = properties
-            .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
-            .orElse(false)
-          val isRowtime = properties
-            .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
-          // remove proctime/rowtime from mapping
-          if (isProctime || isRowtime) {
-            mapping.remove(name)
-          }
-          // check for invalid fields
-          else if (!columnNames.contains(name)) {
-            throw new ValidationException(s"Could not map the schema field '$name' to a field " +
-              s"from source. Please specify the source field from which it can be derived.")
-          }
-      }
-    }
-
-    mapping.toMap.asJava
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
index 1365eae..3aa114e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sinks/CsvTableSinkFactoryBase.scala
@@ -70,10 +70,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory {
     // validate
     new FileSystemValidator().validate(params)
     new OldCsvValidator().validate(params)
-    new SchemaValidator(
-      isStreaming,
-      supportsSourceTimestamps = false,
-      supportsSourceWatermarks = false).validate(params)
+    new SchemaValidator(isStreaming, false, false).validate(params)
 
     // build
     val formatSchema = params.getTableSchema(FORMAT_FIELDS)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
index 41c9698..a7a48bb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/sources/CsvTableSourceFactoryBase.scala
@@ -75,10 +75,7 @@ abstract class CsvTableSourceFactoryBase extends TableFactory {
     // validate
     new FileSystemValidator().validate(params)
     new OldCsvValidator().validate(params)
-    new SchemaValidator(
-      isStreaming,
-      supportsSourceTimestamps = false,
-      supportsSourceWatermarks = false).validate(params)
+    new SchemaValidator(isStreaming, false, false).validate(params)
 
     // build
     val csvTableSourceBuilder = new CsvTableSource.Builder
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
index 0f51c76..8bd327c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
@@ -71,7 +71,7 @@ class RowtimeTest extends DescriptorTestBase {
   }
 
   override def validator(): DescriptorValidator = {
-    new RowtimeValidator(supportsSourceTimestamps = true, supportsSourceWatermarks = false)
+    new RowtimeValidator(true, false)
   }
 
   override def properties(): util.List[util.Map[String, String]] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
index 8bf9709..3ab1a67 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/SchemaTest.scala
@@ -61,10 +61,7 @@ class SchemaTest extends DescriptorTestBase {
   }
 
   override def validator(): DescriptorValidator = {
-    new SchemaValidator(
-      isStreamEnvironment = true,
-      supportsSourceTimestamps = true,
-      supportsSourceWatermarks = true)
+    new SchemaValidator(true, true, true)
   }
 
   override def properties(): util.List[util.Map[String, String]] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
index 28f68d5..6de7525 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/InMemoryTableFactory.scala
@@ -51,10 +51,7 @@ class InMemoryTableFactory(terminationCount: Int)
     params.putProperties(properties)
 
     // validate
-    new SchemaValidator(
-      isStreamEnvironment = true,
-      supportsSourceTimestamps = true,
-      supportsSourceWatermarks = true).validate(params)
+    new SchemaValidator(true, true, true).validate(params)
 
     val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
 
@@ -71,10 +68,7 @@ class InMemoryTableFactory(terminationCount: Int)
     params.putProperties(properties)
 
     // validate
-    new SchemaValidator(
-      isStreamEnvironment = true,
-      supportsSourceTimestamps = true,
-      supportsSourceWatermarks = true).validate(params)
+    new SchemaValidator(true, true, true).validate(params)
 
     val tableSchema = params.getTableSchema(SCHEMA)