You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/11/01 07:02:05 UTC

[GitHub] [flink] wuchong commented on a change in pull request #9994: [FLINK-14322][table-api] Add watermark information in TableSchema

wuchong commented on a change in pull request #9994: [FLINK-14322][table-api] Add watermark information in TableSchema
URL: https://github.com/apache/flink/pull/9994#discussion_r341469357
 
 

 ##########
 File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 ##########
 @@ -54,44 +58,64 @@
 
 	private final DataType[] fieldDataTypes;
 
-	private final Map<String, Integer> fieldNameToIndex;
+	/** Mapping from qualified field name to (nested) field type. */
+	private final Map<String, DataType> typesByName;
 
-	private TableSchema(String[] fieldNames, DataType[] fieldDataTypes) {
+	private final List<WatermarkSpec> watermarkSpecs;
+
+	private TableSchema(String[] fieldNames, DataType[] fieldDataTypes, List<WatermarkSpec> watermarkSpecs) {
 		this.fieldNames = Preconditions.checkNotNull(fieldNames);
 		this.fieldDataTypes = Preconditions.checkNotNull(fieldDataTypes);
+		this.watermarkSpecs = Preconditions.checkNotNull(watermarkSpecs);
 
 		if (fieldNames.length != fieldDataTypes.length) {
-			throw new TableException(
+			throw new ValidationException(
 				"Number of field names and field data types must be equal.\n" +
 					"Number of names is " + fieldNames.length + ", number of data types is " + fieldDataTypes.length + ".\n" +
 					"List of field names: " + Arrays.toString(fieldNames) + "\n" +
 					"List of field data types: " + Arrays.toString(fieldDataTypes));
 		}
 
-		// validate and create name to index mapping
-		fieldNameToIndex = new HashMap<>();
-		final Set<String> duplicateNames = new HashSet<>();
-		final Set<String> uniqueNames = new HashSet<>();
+		// validate and create name to type mapping
+		typesByName = new HashMap<>();
 		for (int i = 0; i < fieldNames.length; i++) {
 			// check for null
-			Preconditions.checkNotNull(fieldDataTypes[i]);
-			final String fieldName = Preconditions.checkNotNull(fieldNames[i]);
-
-			// collect indices
-			fieldNameToIndex.put(fieldName, i);
+			DataType fieldType = Preconditions.checkNotNull(fieldDataTypes[i]);
+			String fieldName = Preconditions.checkNotNull(fieldNames[i]);
+			validateAndCreateNameTypeMapping(fieldName, fieldType, "");
+		}
 
-			// check uniqueness of field names
-			if (uniqueNames.contains(fieldName)) {
-				duplicateNames.add(fieldName);
-			} else {
-				uniqueNames.add(fieldName);
+		// validate watermark and rowtime attribute
+		for (WatermarkSpec watermark : watermarkSpecs) {
+			String rowtimeAttribute = watermark.getRowtimeAttribute();
+			DataType rowtimeType = getFieldDataType(rowtimeAttribute)
+				.orElseThrow(() -> new ValidationException(String.format(
+					"Rowtime attribute '%s' is not defined in schema.", rowtimeAttribute)));
+			if (rowtimeType.getLogicalType().getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE) {
+				throw new ValidationException(String.format(
+					"Rowtime attribute '%s' must be of type TIMESTAMP but is of type '%s'.",
+					rowtimeAttribute, rowtimeType));
+			}
+			LogicalType watermarkOutputType = watermark.getWatermarkStrategy()
+				.getOutputDataType().getLogicalType();
+			if (watermarkOutputType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE) {
+				throw new ValidationException(String.format(
+					"Watermark strategy '%s' must be of type TIMESTAMP but is of type '%s'.",
+					watermark.getWatermarkStrategy().asSerializableString(),
+					watermarkOutputType.asSerializableString()));
 			}
 		}
-		if (!duplicateNames.isEmpty()) {
-			throw new TableException(
-				"Field names must be unique.\n" +
-					"List of duplicate fields: " + duplicateNames.toString() + "\n" +
-					"List of all fields: " + Arrays.toString(fieldNames));
+	}
+
+	private void validateAndCreateNameTypeMapping(String fieldName, DataType fieldType, String prefixQualifiedName) {
+		String fullFieldName = prefixQualifiedName.isEmpty() ? fieldName : prefixQualifiedName + "." + fieldName;
 
 Review comment:
   I think the logic of full name generation is simple (just one line), there is no need to extract a method for it. I can add some javadocs to describe the meaning of `prefixQualifiedName`.
   
   This method is not just creating a mapping, but also does some validation. I think having a "validate" in the method name will be easier to understand. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services