You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/09 12:36:26 UTC

[flink] 04/06: [hotfix][table-common] Validate fields of row type

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0a03f1c49c97ae428db446043670b7dfea48d307
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 7 15:34:24 2019 +0200

    [hotfix][table-common] Validate fields of row type
---
 .../apache/flink/table/types/logical/RowType.java  | 26 ++++++++++++++++++++--
 .../apache/flink/table/types/LogicalTypesTest.java | 21 +++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
index 24630f0..3f6147e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
@@ -19,8 +19,10 @@
 package org.apache.flink.table.types.logical;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -41,8 +43,8 @@ import java.util.stream.Collectors;
  * complex structures.
  *
  * <p>The serialized string representation is {@code ROW<n0 t0 'd0', n1 t1 'd1', ...>} where
- * {@code n} is the name of a field, {@code t} is the logical type of a field, {@code d} is the description
- * of a field.
+ * {@code n} is the unique name of a field, {@code t} is the logical type of a field, {@code d} is
+ * the description of a field.
  */
 @PublicEvolving
 public final class RowType extends LogicalType {
@@ -149,6 +151,8 @@ public final class RowType extends LogicalType {
 		this.fields = Collections.unmodifiableList(
 			new ArrayList<>(
 				Preconditions.checkNotNull(fields, "Fields must not be null.")));
+
+		validateFields(fields);
 	}
 
 	public RowType(List<RowField> fields) {
@@ -231,4 +235,22 @@ public final class RowType extends LogicalType {
 	public int hashCode() {
 		return Objects.hash(super.hashCode(), fields);
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateFields(List<RowField> fields) {
+		final List<String> fieldNames = fields.stream()
+			.map(f -> f.name)
+			.collect(Collectors.toList());
+		if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) {
+			throw new ValidationException("Field names must contain at least one non-whitespace character.");
+		}
+		final Set<String> duplicates = fieldNames.stream()
+			.filter(n -> Collections.frequency(fieldNames, n) > 1)
+			.collect(Collectors.toSet());
+		if (!duplicates.isEmpty()) {
+			throw new ValidationException(
+				String.format("Field names must be unique. Found duplicates: %s", duplicates));
+		}
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index b8b38c5..60f37b2 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.logical.AnyType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -68,6 +69,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Test for subclasses of {@link org.apache.flink.table.types.logical.LogicalType}.
@@ -401,6 +403,25 @@ public class LogicalTypesTest {
 					new RowType.RowField("a", new VarCharType(), "Different desc."),
 					new RowType.RowField("b`", new TimestampType())))
 		);
+
+		try {
+			new RowType(
+				Arrays.asList(
+					new RowType.RowField("b", new VarCharType()),
+					new RowType.RowField("b", new VarCharType()),
+					new RowType.RowField("a", new VarCharType()),
+					new RowType.RowField("a", new TimestampType())));
+			fail("Not unique fields expected.");
+		} catch (ValidationException e) {
+			// ok
+		}
+
+		try {
+			new RowType(Collections.singletonList(new RowType.RowField("", new VarCharType())));
+			fail("Invalid name.");
+		} catch (ValidationException e) {
+			// ok
+		}
 	}
 
 	@Test