You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:51:59 UTC

[GitHub] [flink] godfreyhe opened a new pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

godfreyhe opened a new pull request #12335:
URL: https://github.com/apache/flink/pull/12335


   
   
   ## What is the purpose of the change
   
   *Currently, DDL + table api (with window) does not work, because the source table TableSchema (from TableEnvironment#scan/from method) does not convert the watermark spec to rowtime attribute. This pr aims to fix that*
   
   
   ## Brief change log
   
     - *convert watermark spec to rowtime attributes when executing TableEnvironment#scan/from*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Extended TableSourceTest for validation the fix*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r435953384



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	public final Parser parser;
+
+	public CatalogTableSchemaResolver(Parser parser) {
+		this.parser = parser;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @param isStreamingMode Flag to determine whether the schema of a stream or batch table is created
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema, boolean isStreamingMode) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("proctime can't be defined on watermark spec.");
+				}
+				TimestampType originalType = (TimestampType) fieldTypes[i].getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {

Review comment:
       Can't we do what we are doing here with the flag in the `LegacyCatalogSourceTable`? My reasoning is that we should push the `isStreamingMode` to the planner as much as possible, so that it can make such decisions based on properties of the source. 
   
   Making that distinctions in the API moves us away from that goal binding the API to a particular mode. IMO in the API there should be in the end no distinction between stream and batch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r435951724



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       I had a look at #12260 and my feeling is even stronger that fix it everywhere but not where it actually should be fixed. IMO if we fixed it when the Table is created and have the proper types from the very beginning we would not need to patch it everywhere.
   
   I have the feeling that the problem here is that the type we store in the catalog is wrong. I looked into it and my take is that the TimestampType#kind should also be stored in the catalog and this would fix most of the problems. Right now we are basically stripping that information when storing it in catalog. I am not 100% sure if we should store the Time attribute property, but I think it would fix imo hacks in this PR and #12260. I'd like to know what @twalthr thinks about it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r435773307



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       We can fix the `rowtime` type in `TableSchema` based on `WatermarkSpec`, but we can‘t fix the `proctime` type, unless we just use string compare(`proctime()`). As discussed in https://github.com/apache/flink/pull/12260, we should use `SqlExprToRexConverter` to get the correct type. For table api, there is the entry point to create a flink `Table`, and for sql, `CatalogSchemaTable` is the entry point to create a calcite `Table`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838",
       "triggerID" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d664d2df51b688cd74ff6bb3f4102419de68ecd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436677768



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -146,6 +152,10 @@ public CatalogManager build() {
 		}
 	}
 
+	public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver schemaResolver) {

Review comment:
       argh, I missed that dependency. I can't think of a better solution now. Let's then go with the `setter` approach. Could you add a short comment on the setter why do we have it instead of passing it in ctor?
   
   I think the first paragraph from your answer would work:
   ```
   We do not pass it in the ctor, because we need a {@link Parser} that is constructed in a {@link Planner}.  At the same time {@link Planner} needs a {@link CatalogManager} to be constructed. Thus we can't get {@link Parser} instance when creating a {@link CatalogManager}. See {@link TableEnvironmentImpl#create}.
   ```

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -146,6 +152,10 @@ public CatalogManager build() {
 		}
 	}
 
+	public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver schemaResolver) {

Review comment:
       argh, I missed that dependency. I can't think of a better solution now. Let's then go with the `setter` approach. Could you add a short comment on the setter why do we have it instead of passing it in ctor?
   
   I think the first paragraph from your answer would work:
   ```
   /**
   * We do not pass it in the ctor, because we need a {@link Parser} that is constructed in a
   * {@link Planner}.  At the same time {@link Planner} needs a {@link CatalogManager} to
   * be constructed. Thus we can't get {@link Parser} instance when creating a {@link 
   * CatalogManager}. See {@link TableEnvironmentImpl#create}.
   */
   ```

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
##########
@@ -58,7 +58,7 @@
 	UnresolvedIdentifier parseIdentifier(String identifier);
 
 	/**
-	 * Entry point for parse sql expression expressed as a String.
+	 * Entry point for parsing sql expression expressed as a String.

Review comment:
       ```suggestion
   	 * Entry point for parsing SQL expressions expressed as a String.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8b5dca51c1f186a6a11c590ed015a45f882f06c5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8b5dca51c1f186a6a11c590ed015a45f882f06c5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814) 
   * 1d664d2df51b688cd74ff6bb3f4102419de68ecd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633912374


   cc @wuchong 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838",
       "triggerID" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "527557dcd66932baaa79827fe2cfcc974f1aab40",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2868",
       "triggerID" : "527557dcd66932baaa79827fe2cfcc974f1aab40",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d664d2df51b688cd74ff6bb3f4102419de68ecd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838) 
   * 527557dcd66932baaa79827fe2cfcc974f1aab40 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2868) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436651750



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -146,6 +152,10 @@ public CatalogManager build() {
 		}
 	}
 
+	public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver schemaResolver) {

Review comment:
       I also want to pass it in the ctor, but `Parser` is constructed in `Planner`, while `Planner` needs `CatalogManager` to construct. we can't get `Parser` instance when creating `CatalogManager`. see `TableEnvironmentImpl#create`. 
   
   A workaround approach is: using AtomicReference to hold the parser instance, code looks like:
   ```
   public static TableEnvironmentImpl create(EnvironmentSettings settings) {
   
   		// temporary solution until FLINK-15635 is fixed
   		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
   
   		TableConfig tableConfig = new TableConfig();
   
   		ModuleManager moduleManager = new ModuleManager();
   
                    // create a parser reference first 
   		AtomicReference<Parser> parserRef = new AtomicReference<>();
   		CatalogManager catalogManager = CatalogManager.newBuilder()
   			.classLoader(classLoader)
   			.config(tableConfig.getConfiguration())
   			.defaultCatalog(
   				settings.getBuiltInCatalogName(),
   				new GenericInMemoryCatalog(
   					settings.getBuiltInCatalogName(),
   					settings.getBuiltInDatabaseName()))
                             // set CatalogTableSchemaResolver Supplier instead of CatalogTableSchemaResolver
   			.schemaResolverSupplier(() -> new CatalogTableSchemaResolver(parserRef.get(), settings.isStreamingMode()))
   			.build();
   
   		FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
   
   		Map<String, String> executorProperties = settings.toExecutorProperties();
   		Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
   			.create(executorProperties);
   
   		Map<String, String> plannerProperties = settings.toPlannerProperties();
   		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
   			.create(
   				plannerProperties,
   				executor,
   				tableConfig,
   				functionCatalog,
   				catalogManager);
                   // set parser reference
   		parserRef.set(planner.getParser());
   
   		return new TableEnvironmentImpl(
   			catalogManager,
   			moduleManager,
   			tableConfig,
   			executor,
   			functionCatalog,
   			planner,
   			settings.isStreamingMode()
   		);
   	}
   ```
   
   Users are very confused about how to create `StreamTableEnvironmentImpl` (I know some users use `StreamTableEnvironmentImpl`'s ctor to create a `StreamTableEnvironment` instead of use create method)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1630336016f87ea92bb1186b0522bdcf0dca4fb3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436503474



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Parser.java
##########
@@ -54,4 +56,14 @@
 	 * @throws org.apache.flink.table.api.SqlParserException when failed to parse the identifier
 	 */
 	UnresolvedIdentifier parseIdentifier(String identifier);
+
+	/**
+	 * Entry point for parse sql expression expressed as a String.

Review comment:
       ```suggestion
   	 * Entry point for parsing SQL expressions expressed as a String.
   ```

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -74,4 +94,20 @@ public UnresolvedIdentifier parseIdentifier(String identifier) {
 		SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
 		return UnresolvedIdentifier.of(sqlIdentifier.names);
 	}
+
+	@Override
+	public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+		FlinkTypeFactory typeFactory = typeFactorySupplier.get();
+		List<String> fieldNames = Arrays.asList(inputSchema.getFieldNames());
+		List<LogicalType> fieldTypes = Arrays.stream(inputSchema.getFieldDataTypes())
+				.map(LogicalTypeDataTypeConverter::toLogicalType)
+				.collect(Collectors.toList());
+		RelDataType inputType = typeFactory.buildRelNodeRowType(

Review comment:
       Could we change this method so that we do not need the `typeFactory`? Could we e.g. instead of `Supplier<SqlExprToRexConverterFactory>` pass a `Function<TableSchema, SqlExprToRexConverter`?
   
   I'd prefer to limit the number of different cross dependencies unless strictly necessary.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -74,4 +94,20 @@ public UnresolvedIdentifier parseIdentifier(String identifier) {
 		SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
 		return UnresolvedIdentifier.of(sqlIdentifier.names);
 	}
+
+	@Override
+	public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+		FlinkTypeFactory typeFactory = typeFactorySupplier.get();
+		List<String> fieldNames = Arrays.asList(inputSchema.getFieldNames());
+		List<LogicalType> fieldTypes = Arrays.stream(inputSchema.getFieldDataTypes())
+				.map(LogicalTypeDataTypeConverter::toLogicalType)

Review comment:
       Why not `DataType::getLogicalType`? Why do we even need the `LogicalTypeDataTypeConverter::toLogicalType`?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/ParserImpl.java
##########
@@ -78,4 +80,10 @@ public UnresolvedIdentifier parseIdentifier(String identifier) {
 		SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
 		return UnresolvedIdentifier.of(sqlIdentifier.names);
 	}
+
+	@Override
+	public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+		// do not support for old planner
+		return null;

Review comment:
       Lets throw an exception instead.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ParserImpl.java
##########
@@ -74,4 +94,20 @@ public UnresolvedIdentifier parseIdentifier(String identifier) {
 		SqlIdentifier sqlIdentifier = parser.parseIdentifier(identifier);
 		return UnresolvedIdentifier.of(sqlIdentifier.names);
 	}
+
+	@Override
+	public ResolvedExpression parseSqlExpression(String sqlExpression, TableSchema inputSchema) {
+		FlinkTypeFactory typeFactory = typeFactorySupplier.get();
+		List<String> fieldNames = Arrays.asList(inputSchema.getFieldNames());
+		List<LogicalType> fieldTypes = Arrays.stream(inputSchema.getFieldDataTypes())
+				.map(LogicalTypeDataTypeConverter::toLogicalType)
+				.collect(Collectors.toList());
+		RelDataType inputType = typeFactory.buildRelNodeRowType(
+				JavaScalaConversionUtil$.MODULE$.toScala(fieldNames),

Review comment:
       Could we rather add a method to the `FlinkTypeFactory` that could work with Java? This hack really does not look nice.

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.scala
##########
@@ -76,7 +77,9 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
-  def testProcTimeTableSourceSimple(): Unit = {
+  def testProctimeOnWatermarkSpec(): Unit = {
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage("proctime can't be defined on watermark spec.")

Review comment:
       Shouldn't the message be rather: `Watermark can not be defined for a processing time attribute column?`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -146,6 +152,10 @@ public CatalogManager build() {
 		}
 	}
 
+	public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver schemaResolver) {

Review comment:
       How about we pass it in the ctor? It is not an optional mutable parameter.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	private final Parser parser;
+	private final boolean isStreamingMode;
+
+	public CatalogTableSchemaResolver(Parser parser, boolean isStreamingMode) {
+		this.parser = parser;
+		this.isStreamingMode = isStreamingMode;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now

Review comment:
       Not a comment for this issue/PR, but rather for the `WatermarkSpec`, but this is very error prone imo to use a single string for a rowtime attribute if it is supposed to handle nested columns.
   
   Just as an example. This will break if the dot was escaped.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	public final Parser parser;
+
+	public CatalogTableSchemaResolver(Parser parser) {
+		this.parser = parser;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @param isStreamingMode Flag to determine whether the schema of a stream or batch table is created
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema, boolean isStreamingMode) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("proctime can't be defined on watermark spec.");
+				}
+				TimestampType originalType = (TimestampType) fieldTypes[i].getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {

Review comment:
       Yes, that is also my thinking that a Table, including schema, should be the same in both cases, and possibly handled differently in the planner.
   
   Do you think we could create a JIRA ticket to somehow track this effort? If we cannot remove it currently?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436004322



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       Putting it in `CatalogManager#getTable` sounds good to me. We thought to have a single util to resolve the schema. But I think you have a better idea to resolve the schema before CatalogTable is exposed out of CatalogManager, this can avoid to forget resove the shema. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436337888



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	public final Parser parser;
+
+	public CatalogTableSchemaResolver(Parser parser) {
+		this.parser = parser;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @param isStreamingMode Flag to determine whether the schema of a stream or batch table is created
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema, boolean isStreamingMode) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("proctime can't be defined on watermark spec.");
+				}
+				TimestampType originalType = (TimestampType) fieldTypes[i].getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {

Review comment:
       I agree with you that we should push the `isStreamingMode` to the planner as much as possible. Many classes `isStreamingMode` flag now. I'm also try to remove the `isStreamingMode` from `CatalogTableSchemaResolver`, but I find we have to handle "erase the rowtime type logic for batch" at least three places (`TableEnvironmentImpl#scanInternal` for table api, `CatalogSchemaTable#getRowType` for  `CatalogTable`, `DatabaseCalciteSchema#getTable` for `QueryOperationCatalogView`). Because we should make sure the type of `Table` from catalog and the type of `RelNode` expanded from `Table` (e.g. add projection node for computed column, add watermark assigner node for watermark, expand different kinds of view) are the same. For a long term, I think we should also keep the rowtime type for batch (e.g. support rowtime temporal join), and then we can remove `isStreamingMode` from `CatalogTableSchemaResolver`, and many other related logic can be simplified. (It's too complex to handle different kind of table or view for both batch and streaming)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r435976235



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       In the end my reasoning is that this logic should be pushed as close to the catalog as possible. If we cannot push it all the way to the creation of the table, maybe at least we can push it into the single place when we look up that table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r435963387



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       We had a long discussion in FLINK-17189 and had a conclusion that the TimestampType#kind shoudn't be stored in catalog. IMO, there are several reasons: 
   
   1) the metadata stored in catalog should simply store the information of DDL. In other words, we should only store the pure DDL into catalog. 
   2) the result datatype of computed column is not trusted, they may changed across versions, including `PROCTIME()` and user-defined functions. There are some users reported that the result type of `PROCTIME()` should be `TIMESTAMP WITH LOCAL TIME ZONE`, not `TIMESTAMP WITHOUT TIME ZONE`.  Users may also change their UDF implementation. If we just use the result type stored in catalog, there of course will be an exception during code generation or runtime, because of type mismatch. 
   3) I don't want to treat proctime attribute and rowtime attribute specially when storing them in catalog. They are not different than other computed columns and regular columns. 
   
   Thus, we **have to** resolve the result type of computed columns again, when we get the `CatalogTable` from catalog. Actually, the initial `CatalogTable` get from catalog is a unresovled table (like UnresolvedExpression). That's why we need a `CatalogTableShemaResolver` to resolve `CatalogTableSchema`. 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r435801534



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	public final Parser parser;
+
+	public CatalogTableSchemaResolver(Parser parser) {
+		this.parser = parser;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @param isStreamingMode Flag to determine whether the schema of a stream or batch table is created
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema, boolean isStreamingMode) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("proctime can't be defined on watermark spec.");
+				}
+				TimestampType originalType = (TimestampType) fieldTypes[i].getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {

Review comment:
       yes, we need this. because `LegacyCatalogSourceTable` will erase time indicator types and create a watermark node if `isStreamingMode` is true and watermarkSpec is not empty. (see `LegacyCatalogSourceTable`) otherwise we will get type mismatch error for batch mode.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436600016



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	public final Parser parser;
+
+	public CatalogTableSchemaResolver(Parser parser) {
+		this.parser = parser;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @param isStreamingMode Flag to determine whether the schema of a stream or batch table is created
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema, boolean isStreamingMode) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("proctime can't be defined on watermark spec.");
+				}
+				TimestampType originalType = (TimestampType) fieldTypes[i].getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {

Review comment:
       I created a JIRA to track this https://issues.apache.org/jira/browse/FLINK-18180




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f86268e43fdfdfd32c7ad4381e243eae33a123f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196) 
   * 1630336016f87ea92bb1186b0522bdcf0dca4fb3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436004322



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       Putting it in `CatalogManager#getTable` sounds good to me. We thought to have a single util to resolve the schema. But I think you have a better idea to resolve the schema before CatalogTable is exposed out of CatalogManager, this can avoid forgetting to resolve the shema. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436337888



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	public final Parser parser;
+
+	public CatalogTableSchemaResolver(Parser parser) {
+		this.parser = parser;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @param isStreamingMode Flag to determine whether the schema of a stream or batch table is created
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema, boolean isStreamingMode) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("proctime can't be defined on watermark spec.");
+				}
+				TimestampType originalType = (TimestampType) fieldTypes[i].getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {

Review comment:
       I agree with you that we should push the `isStreamingMode` to the planner as much as possible. Many classes have `isStreamingMode` flag now. I'm also try to remove the `isStreamingMode` from `CatalogTableSchemaResolver`, but I find we have to handle "erase the rowtime type logic for batch" at least three places (`TableEnvironmentImpl#scanInternal` for table api, `CatalogSchemaTable#getRowType` for  `CatalogTable`, `DatabaseCalciteSchema#getTable` for `QueryOperationCatalogView`). Because we should make sure the type of `Table` from catalog and the type of `RelNode` expanded from `Table` (e.g. add projection node for computed column, add watermark assigner node for watermark, expand different kinds of view) are the same. For a long term, I think we should also keep the rowtime type for batch (e.g. support rowtime temporal join), and then we can remove `isStreamingMode` from `CatalogTableSchemaResolver`, and many other related logic can be simplified. (It's too complex to handle different kind of tables or views for both batch and streaming)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838",
       "triggerID" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "527557dcd66932baaa79827fe2cfcc974f1aab40",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "527557dcd66932baaa79827fe2cfcc974f1aab40",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1d664d2df51b688cd74ff6bb3f4102419de68ecd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838) 
   * 527557dcd66932baaa79827fe2cfcc974f1aab40 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1630336016f87ea92bb1186b0522bdcf0dca4fb3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777) 
   * 8b5dca51c1f186a6a11c590ed015a45f882f06c5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys closed pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
dawidwys closed pull request #12335:
URL: https://github.com/apache/flink/pull/12335


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838",
       "triggerID" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "527557dcd66932baaa79827fe2cfcc974f1aab40",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2868",
       "triggerID" : "527557dcd66932baaa79827fe2cfcc974f1aab40",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 527557dcd66932baaa79827fe2cfcc974f1aab40 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2868) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838",
       "triggerID" : "1d664d2df51b688cd74ff6bb3f4102419de68ecd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8b5dca51c1f186a6a11c590ed015a45f882f06c5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2814) 
   * 1d664d2df51b688cd74ff6bb3f4102419de68ecd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2838) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633921246


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2196",
       "triggerID" : "8f86268e43fdfdfd32c7ad4381e243eae33a123f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777",
       "triggerID" : "1630336016f87ea92bb1186b0522bdcf0dca4fb3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8b5dca51c1f186a6a11c590ed015a45f882f06c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1630336016f87ea92bb1186b0522bdcf0dca4fb3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=2777) 
   * 8b5dca51c1f186a6a11c590ed015a45f882f06c5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436337888



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	public final Parser parser;
+
+	public CatalogTableSchemaResolver(Parser parser) {
+		this.parser = parser;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @param isStreamingMode Flag to determine whether the schema of a stream or batch table is created
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema, boolean isStreamingMode) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("proctime can't be defined on watermark spec.");
+				}
+				TimestampType originalType = (TimestampType) fieldTypes[i].getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {

Review comment:
       I agree with you that we should push the `isStreamingMode` to the planner as much as possible. Many classes have `isStreamingMode` flag now. I'm also try to remove the `isStreamingMode` from `CatalogTableSchemaResolver`, but I find we have to handle "erase the rowtime type logic for batch" at least three places (`TableEnvironmentImpl#scanInternal` for table api, `CatalogSchemaTable#getRowType` for  `CatalogTable`, `DatabaseCalciteSchema#getTable` for `QueryOperationCatalogView`). Because we should make sure the type of `Table` from catalog and the type of `RelNode` expanded from `Table` (e.g. add projection node for computed column, add watermark assigner node for watermark, expand different kinds of view) are the same. For a long term, I think we should also keep the rowtime type for batch (e.g. support rowtime temporal join), and then we can remove `isStreamingMode` from `CatalogTableSchemaResolver`, and many other related logic can be simplified. (It's too complex to handle different kind of table or view for both batch and streaming)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r435975124



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       I am 90% on the same page with the resolution. The 10 remaining percent still think that the stored type should be correct, but let's leave it out for now and assume I agree with the additional resolution. 
   
   I still think we can place the resolution at a better location to reuse it between Table API & SQL. Would it work if we put it in the `CatalogManager#getTable`? That way it would be adjusted both for the SQL & Table API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r435747420



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/CatalogTableSchemaResolver.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.Arrays;
+
+/**
+ * The {@link CatalogTableSchemaResolver} is used to derive correct result type of computed column,
+ * because the date type of computed column from catalog table is not trusted.
+ *
+ * <p>Such as `proctime()` function, its type in given TableSchema is Timestamp(3),
+ * but its correct type is Timestamp(3) *PROCTIME*.
+ */
+@Internal
+public class CatalogTableSchemaResolver {
+	public final Parser parser;
+
+	public CatalogTableSchemaResolver(Parser parser) {
+		this.parser = parser;
+	}
+
+	/**
+	 * Resolve the computed column's type for the given schema.
+	 *
+	 * @param tableSchema Table schema to derive table field names and data types
+	 * @param isStreamingMode Flag to determine whether the schema of a stream or batch table is created
+	 * @return the resolved TableSchema
+	 */
+	public TableSchema resolve(TableSchema tableSchema, boolean isStreamingMode) {
+		final String rowtime;
+		if (!tableSchema.getWatermarkSpecs().isEmpty()) {
+			// TODO: [FLINK-14473] we only support top-level rowtime attribute right now
+			rowtime = tableSchema.getWatermarkSpecs().get(0).getRowtimeAttribute();
+			if (rowtime.contains(".")) {
+				throw new ValidationException(
+						String.format("Nested field '%s' as rowtime attribute is not supported right now.", rowtime));
+			}
+		} else {
+			rowtime = null;
+		}
+
+		String[] fieldNames = tableSchema.getFieldNames();
+		DataType[] fieldTypes = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount());
+
+		for (int i = 0; i < tableSchema.getFieldCount(); ++i) {
+			TableColumn tableColumn = tableSchema.getTableColumns().get(i);
+			if (tableColumn.isGenerated() && isProctimeType(tableColumn.getExpr().get(), tableSchema)) {
+				if (fieldNames[i].equals(rowtime)) {
+					throw new TableException("proctime can't be defined on watermark spec.");
+				}
+				TimestampType originalType = (TimestampType) fieldTypes[i].getLogicalType();
+				LogicalType proctimeType = new TimestampType(
+						originalType.isNullable(),
+						TimestampKind.PROCTIME,
+						originalType.getPrecision());
+				fieldTypes[i] = TypeConversions.fromLogicalToDataType(proctimeType);
+			} else if (isStreamingMode && fieldNames[i].equals(rowtime)) {

Review comment:
       Do we really need to differentiate the streaming vs non-streaming mode? IMO we shouldn't do it. The kind should be just an extra metainformation in the schema that can be ignored in batch mode.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       Shouldn't this resolution be part of the `TableSchema` itself? IMO when a watermarkSpec is applied to a `TableSchema` it should adjust the type of the rowtime attribute field. I think it's too late in the `TableEnvironment` to apply that transformation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436334572



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -490,8 +490,11 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
 	private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
 		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
 
-		return catalogManager.getTable(tableIdentifier)
-			.map(t -> new CatalogQueryOperation(tableIdentifier, t.getTable().getSchema()));
+		return catalogManager.getTable(tableIdentifier).map(t -> {
+			CatalogTableSchemaResolver resolver = new CatalogTableSchemaResolver(parser);

Review comment:
       Thanks for the suggestion. I'm also agree we move the resolution logic into `CatalogManager#getTable` as a unified place for Table API and SQL. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12335: [FLINK-17753] [table-planner-blink] Fix watermark defined in ddl does not work in Table api

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12335:
URL: https://github.com/apache/flink/pull/12335#issuecomment-633913253






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org