You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/09 12:36:26 UTC
[flink] 04/06: [hotfix][table-common] Validate fields of row type
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0a03f1c49c97ae428db446043670b7dfea48d307
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 7 15:34:24 2019 +0200
[hotfix][table-common] Validate fields of row type
---
.../apache/flink/table/types/logical/RowType.java | 26 ++++++++++++++++++++--
.../apache/flink/table/types/LogicalTypesTest.java | 21 +++++++++++++++++
2 files changed, 45 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
index 24630f0..3f6147e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/RowType.java
@@ -19,8 +19,10 @@
package org.apache.flink.table.types.logical;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
@@ -41,8 +43,8 @@ import java.util.stream.Collectors;
* complex structures.
*
* <p>The serialized string representation is {@code ROW<n0 t0 'd0', n1 t1 'd1', ...>} where
- * {@code n} is the name of a field, {@code t} is the logical type of a field, {@code d} is the description
- * of a field.
+ * {@code n} is the unique name of a field, {@code t} is the logical type of a field, {@code d} is
+ * the description of a field.
*/
@PublicEvolving
public final class RowType extends LogicalType {
@@ -149,6 +151,8 @@ public final class RowType extends LogicalType {
this.fields = Collections.unmodifiableList(
new ArrayList<>(
Preconditions.checkNotNull(fields, "Fields must not be null.")));
+
+ validateFields(fields);
}
public RowType(List<RowField> fields) {
@@ -231,4 +235,22 @@ public final class RowType extends LogicalType {
public int hashCode() {
return Objects.hash(super.hashCode(), fields);
}
+
+ // --------------------------------------------------------------------------------------------
+
+ private static void validateFields(List<RowField> fields) {
+ final List<String> fieldNames = fields.stream()
+ .map(f -> f.name)
+ .collect(Collectors.toList());
+ if (fieldNames.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) {
+ throw new ValidationException("Field names must contain at least one non-whitespace character.");
+ }
+ final Set<String> duplicates = fieldNames.stream()
+ .filter(n -> Collections.frequency(fieldNames, n) > 1)
+ .collect(Collectors.toSet());
+ if (!duplicates.isEmpty()) {
+ throw new ValidationException(
+ String.format("Field names must be unique. Found duplicates: %s", duplicates));
+ }
+ }
}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
index b8b38c5..60f37b2 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.AnyType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
@@ -68,6 +69,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Test for subclasses of {@link org.apache.flink.table.types.logical.LogicalType}.
@@ -401,6 +403,25 @@ public class LogicalTypesTest {
new RowType.RowField("a", new VarCharType(), "Different desc."),
new RowType.RowField("b`", new TimestampType())))
);
+
+ try {
+ new RowType(
+ Arrays.asList(
+ new RowType.RowField("b", new VarCharType()),
+ new RowType.RowField("b", new VarCharType()),
+ new RowType.RowField("a", new VarCharType()),
+ new RowType.RowField("a", new TimestampType())));
+ fail("Not unique fields expected.");
+ } catch (ValidationException e) {
+ // ok
+ }
+
+ try {
+ new RowType(Collections.singletonList(new RowType.RowField("", new VarCharType())));
+ fail("Invalid name.");
+ } catch (ValidationException e) {
+ // ok
+ }
}
@Test