You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/13 10:12:54 UTC

[GitHub] [flink] slinkydeveloper opened a new pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

slinkydeveloper opened a new pull request #18349:
URL: https://github.com/apache/flink/pull/18349


   ## What is the purpose of the change
   
   This PR removes the workaround of generating a random `ObjectIdentifier` for anonymous inline tables, that is:
   
   * Tables defined only with `TableDescriptor`, e.g. `TableEnvironment#from(TableDescriptor)`.
   * `DataStream` from/to conversions
   * `TableResult#collect`
   
   Now these tables don't go through the `CatalogManager` anymore and they don't have an assigned `ObjectIdentifier` anymore.
   
   Also, for every kind of table (permanent, temporary and anonymous), the table resolved schema, options, and everything inside `ResolvedCatalogTable` is propagated directly through the stack, rather than querying the `Catalog` at a later stage of planning. Among the others, this is a requirement to fix https://issues.apache.org/jira/browse/FLINK-25386.  
   
   ## Brief change log
   
   * `DynamicTableFactory.Context#getObjectIdentifier` deprecated and replaced with `DynamicTableFactory.Context#getIdentifier`, which now returns a `Optional<ObjectIdentifier>`.
   * Move `TableLookupResult` to the upper level and renamed to `ContextResolvedTable`. Now this POJO is used to propagate `ResolvedCatalogTable`,  `ObjectIdentifier` (if any) and associated `Catalog` (if any) throughout the stack. See the Javadoc for details.
   * Renamed `CatalogQueryOperation` and `CatalogSinkModifyOperation` to `SourceQueryOperation` and `SinkModifyOperation` and propagate `ContextResolvedTable`. The renaming is done as the name `Catalog` is confusing and implies that the table can be found inside a catalog, which is not the case anymore for anonymous tables.
   * Methods in `Table`, `TableEnvironment` and `StatementSet` creating/accepting anonymous tables now don't generate `ObjectIdentifier` anymore.
   * Methods in `Table`, `TableEnvironment` and `StatementSet` using `ObjectIdentifier` will perform a lookup at the call site, rather than later during the planning. Note that this is a *breaking change* in the behaviour of the APIs, as before one could first define the pipeline and later fill the catalog with the tables, while with this change the invocation of a method like `tEnv.from(String tableName)` will fail if the table has not been included inside the catalog before. Neverthless, this shouldn't be a problem for majority of the users and will improve the usability of our APIs, as now they "fail fast".
   * Now `QueryOperationConverter` manually creates the `RelNode` for anonymous tables, skipping Calcite table resolution
   * Every intermediate table representation between the `Operation`s and the final `ExecNode`s now embed `ContextResolvedTable`, rather than `ObjectIdentifier` and `ResolvedCatalogTable`.
   
   Because this is a substantial refactoring, in order to simplify the review, I isolated the most important changes in separate commits. The last commit is a big one, and essentially includes the propagation of the renamings, the propagation of `ContextResolvedTable` through the stack, and little changes to remove the assumption that `ObjectIdentifier` is always non null. 
   
   There is also an hotfix "Add default implementation for `DynamicTableFactory.Context#getEnrichmentOptions` to alleviate breaking change" not related to this PR.
   
   ## Verifying this change
   
   All the changes are verified by existing tests. Some tests had been adapted and hardened, see changes to `TableEnvironmentTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: yes
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 47668982c9897b1f8d8d7ffa8b10ac8a134f088c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418) 
   * a14694e186c08ce08e5e4af5d60f410f43025965 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r785791932



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
##########
@@ -84,7 +90,7 @@ public QueryOperation getChild() {
     @Override
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
-        params.put("identifier", tableIdentifier);
+        params.put("identifier", contextResolvedTable);

Review comment:
       I reverted and now I print the identifier as before, I think it's better to keep `identifier` as the string key to remain consistent with other operations




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r787463301



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
##########
@@ -62,10 +62,10 @@
      * written to a table (backed by a {@link DynamicTableSink}) expressed via the given {@link
      * TableDescriptor}.
      *
-     * <p>The given {@link TableDescriptor descriptor} is registered as an inline (i.e. anonymous)
-     * temporary catalog table (see {@link TableEnvironment#createTemporaryTable(String,
-     * TableDescriptor)}. Then a statement is added to the statement set that inserts the {@link
-     * Table} object's pipeline into that temporary table.
+     * <p>The given {@link TableDescriptor descriptor} won't be registered in the catalog, but it
+     * will be propagated directly in the operation tree, adding a statement to the statement set

Review comment:
       Reworded, check now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 47668982c9897b1f8d8d7ffa8b10ac8a134f088c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 47668982c9897b1f8d8d7ffa8b10ac8a134f088c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418) 
   * a14694e186c08ce08e5e4af5d60f410f43025965 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8dd59b65de6e71d57da22d35686b0758ff3aed24 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620) 
   * 04379c97fabc5ee28e30dfcd6485bf8e5c0da69e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654",
       "triggerID" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a2b67f6358c91a443a8b2d99643e5298d08ba14",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6a2b67f6358c91a443a8b2d99643e5298d08ba14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4399cdc5964a3f49354cb114f5c043f5b2702dc Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654) 
   * 6a2b67f6358c91a443a8b2d99643e5298d08ba14 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   * 104164685e8d378ea3983b2b7ed3aafaa71dcec4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r785793354



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -19,40 +19,45 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Describes a relational operation that was created from a lookup to a catalog. */
+/**
+ * Describes a query operation from a {@link ContextResolvedTable}.
+ *
+ * <p>The source table is described by {@link #getContextResolvedTable()}, and in general is used
+ * for every source which implementation is defined with {@link DynamicTableSource}. {@code
+ * DataStream} sources are handled by {@code DataStreamQueryOperation}.
+ */
 @Internal
-public class CatalogQueryOperation implements QueryOperation {
+public class SourceQueryOperation implements QueryOperation {
 
-    private final ObjectIdentifier tableIdentifier;
-    private final ResolvedSchema resolvedSchema;
+    private final ContextResolvedTable contextResolvedTable;
 
-    public CatalogQueryOperation(ObjectIdentifier tableIdentifier, ResolvedSchema resolvedSchema) {
-        this.tableIdentifier = tableIdentifier;
-        this.resolvedSchema = resolvedSchema;
+    public SourceQueryOperation(ContextResolvedTable contextResolvedTable) {
+        this.contextResolvedTable = contextResolvedTable;
     }
 
-    public ObjectIdentifier getTableIdentifier() {
-        return tableIdentifier;
+    public ContextResolvedTable getContextResolvedTable() {
+        return contextResolvedTable;
     }
 
     @Override
     public ResolvedSchema getResolvedSchema() {
-        return resolvedSchema;
+        return this.contextResolvedTable.getResolvedSchema();
     }
 
     @Override
     public String asSummaryString() {
         Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", tableIdentifier);
-        args.put("fields", resolvedSchema.getColumnNames());
+        args.put("identifier", this.contextResolvedTable.toString());

Review comment:
       Same comment as here https://github.com/apache/flink/pull/18349#discussion_r785791932




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f7aa49ab83f748fe9cbf1186add84929ea50bfb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 04379c97fabc5ee28e30dfcd6485bf8e5c0da69e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0aa7220300adc4f5f9ef0aa59608f299b3b9ac29 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584) 
   * 8dd59b65de6e71d57da22d35686b0758ff3aed24 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8dd59b65de6e71d57da22d35686b0758ff3aed24 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620) 
   * 04379c97fabc5ee28e30dfcd6485bf8e5c0da69e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a14694e186c08ce08e5e4af5d60f410f43025965 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541) 
   * 8f7aa49ab83f748fe9cbf1186add84929ea50bfb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r784698819



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -263,7 +265,9 @@ public static DynamicTableSink createDynamicTableSink(
                             "Unable to create a sink for writing table '%s'.\n\n"

Review comment:
       same comment as above

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -235,7 +235,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
         KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions);
 
         validatePKConstraints(
-                context.getObjectIdentifier(),
+                context.getIdentifier().map(ObjectIdentifier::toString).orElse("anonymous"),

Review comment:
       adapt the error message to `The Kafka table with '%s' format...`. Having `"anonymous"` in the code base is not helpful, in this case we should just omit `'%s'`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SinkModifyOperation.java
##########
@@ -84,7 +90,7 @@ public QueryOperation getChild() {
     @Override
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
-        params.put("identifier", tableIdentifier);
+        params.put("identifier", contextResolvedTable);

Review comment:
       `identifier` -> `table`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object
+ * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private final @Nullable ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), resolvedTable);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(null, null, resolvedTable);
+    }
+
+    private ContextResolvedTable(
+            @Nullable ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        this.objectIdentifier = objectIdentifier;
+        this.catalog = catalog;
+        this.resolvedTable = resolvedTable;
+    }
+
+    public boolean isAnonymous() {
+        return objectIdentifier == null;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    /** Returns empty if {@link #isAnonymous()} is true. */
+    public Optional<ObjectIdentifier> getIdentifier() {
+        return Optional.ofNullable(objectIdentifier);
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException("Cannot copy VIEW with new options.");

Review comment:
       `new ValidationException("The view '%s' cannot be enriched with new options.")`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {

Review comment:
       can't we just create helper method that takes `ContextResolvedTable`? We don't need to perform another lookup.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -76,10 +78,23 @@ public StatementSet addInsert(String targetPath, Table table, boolean overwrite)
                 tableEnvironment.getParser().parseIdentifier(targetPath);
         ObjectIdentifier objectIdentifier =
                 tableEnvironment.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        ContextResolvedTable contextResolvedTable =
+                tableEnvironment
+                        .getCatalogManager()
+                        .getTable(objectIdentifier)
+                        .orElseThrow(
+                                () ->
+                                        new TableException(

Review comment:
       Instead of coming up with an exception at every location. Introduce a `CatalogManager.getTableOrError`.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -56,7 +57,12 @@ public ResolvedSchema getResolvedSchema() {
     @Override
     public String asSummaryString() {
         Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", this.contextResolvedTable.toString());
+        args.put(
+                "identifier",
+                this.contextResolvedTable

Review comment:
       improve this

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/DatabaseCalciteSchema.java
##########
@@ -74,15 +74,14 @@ public Table getTable(String tableName) {
                 .map(
                         lookupResult ->
                                 new CatalogSchemaTable(
-                                        identifier,
                                         lookupResult,
                                         getStatistic(lookupResult, identifier),
                                         isStreamingMode))
                 .orElse(null);
     }
 
     private FlinkStatistic getStatistic(
-            TableLookupResult lookupResult, ObjectIdentifier identifier) {
+            ContextResolvedTable lookupResult, ObjectIdentifier identifier) {

Review comment:
       rename variables in this class

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
##########
@@ -287,27 +289,28 @@ public RexNode visitInputRef(RexInputRef inputRef) {
             if (!catalogOptional.isPresent()) {
                 throw new TableException(
                         String.format(
-                                "Table %s must from a catalog, but %s is not a catalog",
-                                identifier.asSummaryString(), identifier.getCatalogName()));
+                                "Table '%s' factory doesn't provide partitions, and it cannot be loaded from the catalog",

Review comment:
       `factory` -> `connector`
   
   It is nice that you would like to improve the exception. But please do this in a hotfix commit. It is useful on its own. Or collect all minor improvements in one umbrella commit.

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/TableSinkUtils.scala
##########
@@ -56,10 +56,10 @@ object TableSinkUtils {
     * @param partitionKeys The partition keys of this table.
     */
   def validateTableSink(
-      sinkOperation: CatalogSinkModifyOperation,
-      sinkIdentifier: ObjectIdentifier,
-      sink: TableSink[_],
-      partitionKeys: Seq[String]): Unit = {
+                         sinkOperation: SinkModifyOperation,

Review comment:
       fix indention

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ExternalModifyOperation.java
##########
@@ -96,7 +79,6 @@ public ResolvedSchema getResolvedSchema() {
     @Override
     public String asSummaryString() {
         final Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", tableIdentifier);

Review comment:
       add a `asSummaryString` to `ContextResolvedTable` to make this summary string helpful again

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object
+ * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private final @Nullable ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), resolvedTable);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable);

Review comment:
       add `Preconditions.checkNotNull`

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -80,9 +83,21 @@
     @PublicEvolving
     interface Context {
 
-        /** Returns the identifier of the table in the {@link Catalog}. */
+        /**
+         * Returns the identifier of the table in the {@link Catalog}.
+         *
+         * @deprecated Because the table {@link ObjectIdentifier} could be null (e.g. for anonymous
+         *     tables), you should use {@link #getIdentifier()} instead.
+         */
+        @Deprecated
+        @Nullable
         ObjectIdentifier getObjectIdentifier();
 
+        /** Returns the identifier of the table in the {@link Catalog}, if any. */

Review comment:
       Add: `The identifier is empty for anonymous/inline tables defined in Table API.`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.

Review comment:
       `it's` -> `is`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog and doesn't have an

Review comment:
       `A` -> `An`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object
+ * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private final @Nullable ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), resolvedTable);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(null, null, resolvedTable);
+    }
+
+    private ContextResolvedTable(
+            @Nullable ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        this.objectIdentifier = objectIdentifier;
+        this.catalog = catalog;
+        this.resolvedTable = resolvedTable;
+    }
+
+    public boolean isAnonymous() {
+        return objectIdentifier == null;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    /** Returns empty if {@link #isAnonymous()} is true. */
+    public Optional<ObjectIdentifier> getIdentifier() {
+        return Optional.ofNullable(objectIdentifier);
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException("Cannot copy VIEW with new options.");
+        }
+        return new ContextResolvedTable(
+                objectIdentifier, catalog, ((ResolvedCatalogTable) resolvedTable).copy(newOptions));
+    }
+
+    @Override
+    public String toString() {

Review comment:
       let's skip this method and the method below in this commit, I see you added more logic in the next one.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -19,40 +19,45 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Describes a relational operation that was created from a lookup to a catalog. */
+/**
+ * Describes a query operation from a {@link ContextResolvedTable}.
+ *
+ * <p>The source table is described by {@link #getContextResolvedTable()}, and in general is used
+ * for every source which implementation is defined with {@link DynamicTableSource}. {@code
+ * DataStream} sources are handled by {@code DataStreamQueryOperation}.

Review comment:
       `DataStreamQueryOperation` -> `ExternalQueryOperation`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -19,40 +19,45 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Describes a relational operation that was created from a lookup to a catalog. */
+/**
+ * Describes a query operation from a {@link ContextResolvedTable}.
+ *
+ * <p>The source table is described by {@link #getContextResolvedTable()}, and in general is used
+ * for every source which implementation is defined with {@link DynamicTableSource}. {@code
+ * DataStream} sources are handled by {@code DataStreamQueryOperation}.
+ */
 @Internal
-public class CatalogQueryOperation implements QueryOperation {
+public class SourceQueryOperation implements QueryOperation {
 
-    private final ObjectIdentifier tableIdentifier;
-    private final ResolvedSchema resolvedSchema;
+    private final ContextResolvedTable contextResolvedTable;
 
-    public CatalogQueryOperation(ObjectIdentifier tableIdentifier, ResolvedSchema resolvedSchema) {
-        this.tableIdentifier = tableIdentifier;
-        this.resolvedSchema = resolvedSchema;
+    public SourceQueryOperation(ContextResolvedTable contextResolvedTable) {
+        this.contextResolvedTable = contextResolvedTable;
     }
 
-    public ObjectIdentifier getTableIdentifier() {
-        return tableIdentifier;
+    public ContextResolvedTable getContextResolvedTable() {
+        return contextResolvedTable;
     }
 
     @Override
     public ResolvedSchema getResolvedSchema() {
-        return resolvedSchema;
+        return this.contextResolvedTable.getResolvedSchema();

Review comment:
       nit: remove `this.`

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -160,7 +160,9 @@ public static DynamicTableSource createDynamicTableSource(
                             "Unable to create a source for reading table '%s'.\n\n"
                                     + "Table options are:\n\n"
                                     + "%s",
-                            objectIdentifier.asSummaryString(),
+                            objectIdentifier != null

Review comment:
       Improve error message to `Unable to create a source for reading the table.`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java
##########
@@ -19,40 +19,45 @@
 package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-/** Describes a relational operation that was created from a lookup to a catalog. */
+/**
+ * Describes a query operation from a {@link ContextResolvedTable}.
+ *
+ * <p>The source table is described by {@link #getContextResolvedTable()}, and in general is used
+ * for every source which implementation is defined with {@link DynamicTableSource}. {@code
+ * DataStream} sources are handled by {@code DataStreamQueryOperation}.
+ */
 @Internal
-public class CatalogQueryOperation implements QueryOperation {
+public class SourceQueryOperation implements QueryOperation {
 
-    private final ObjectIdentifier tableIdentifier;
-    private final ResolvedSchema resolvedSchema;
+    private final ContextResolvedTable contextResolvedTable;
 
-    public CatalogQueryOperation(ObjectIdentifier tableIdentifier, ResolvedSchema resolvedSchema) {
-        this.tableIdentifier = tableIdentifier;
-        this.resolvedSchema = resolvedSchema;
+    public SourceQueryOperation(ContextResolvedTable contextResolvedTable) {
+        this.contextResolvedTable = contextResolvedTable;
     }
 
-    public ObjectIdentifier getTableIdentifier() {
-        return tableIdentifier;
+    public ContextResolvedTable getContextResolvedTable() {
+        return contextResolvedTable;
     }
 
     @Override
     public ResolvedSchema getResolvedSchema() {
-        return resolvedSchema;
+        return this.contextResolvedTable.getResolvedSchema();
     }
 
     @Override
     public String asSummaryString() {
         Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", tableIdentifier);
-        args.put("fields", resolvedSchema.getColumnNames());
+        args.put("identifier", this.contextResolvedTable.toString());

Review comment:
       `identifier` -> `table`

##########
File path: flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java
##########
@@ -93,12 +82,12 @@ public ChangelogMode getChangelogMode() {
     @Override
     public String asSummaryString() {
         final Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", identifier);
+        args.put("identifier", contextResolvedTable);

Review comment:
       `identifier` -> `table`

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
##########
@@ -1375,10 +1375,9 @@ default Table limit(int offset, int fetch) {
      * table (backed by a {@link DynamicTableSink}) expressed via the given {@link TableDescriptor}.
      * It executes the insert operation.
      *
-     * <p>The {@link TableDescriptor descriptor} is registered as an inline (i.e. anonymous)
-     * temporary catalog table (see {@link TableEnvironment#createTemporaryTable(String,
-     * TableDescriptor)}) using a unique identifier. Note that calling this method multiple times,
-     * even with the same descriptor, results in multiple sink tables being registered.
+     * <p>The {@link TableDescriptor descriptor} won't be registered in the catalog, but it will be
+     * propagated directly in the operation tree. Note that calling this method multiple times, even
+     * with the same descriptor, results in multiple sink tables being registered.

Review comment:
       `registered` is not the right work here anymore

##########
File path: flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
##########
@@ -202,17 +207,12 @@ public static Executor lookupExecutor(
         final ResolvedSchema resolvedSchema =
                 schemaResolver.resolve(schemaTranslationResult.getSchema());
 
-        final UnresolvedIdentifier unresolvedIdentifier =
-                UnresolvedIdentifier.of(
-                        "Unregistered_DataStream_Sink_" + ExternalModifyOperation.getUniqueId());
-        final ObjectIdentifier objectIdentifier =
-                catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
         final ExternalModifyOperation modifyOperation =
                 new ExternalModifyOperation(
-                        objectIdentifier,
+                        ContextResolvedTable.anonymous(
+                                new ResolvedCatalogTable(

Review comment:
       Same comment as above. call `CatalogManager.resolveCatalogTable` directly.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
##########
@@ -62,10 +62,10 @@
      * written to a table (backed by a {@link DynamicTableSink}) expressed via the given {@link
      * TableDescriptor}.
      *
-     * <p>The given {@link TableDescriptor descriptor} is registered as an inline (i.e. anonymous)
-     * temporary catalog table (see {@link TableEnvironment#createTemporaryTable(String,
-     * TableDescriptor)}. Then a statement is added to the statement set that inserts the {@link
-     * Table} object's pipeline into that temporary table.
+     * <p>The given {@link TableDescriptor descriptor} won't be registered in the catalog, but it
+     * will be propagated directly in the operation tree, adding a statement to the statement set

Review comment:
       Very long sentence that mentions descriptor two times. Break it up into two sentences?

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1161,7 +1160,7 @@ public TableResultInternal executeInternal(Operation operation) {
         } else if (operation instanceof ShowCreateTableOperation) {
             ShowCreateTableOperation showCreateTableOperation =
                     (ShowCreateTableOperation) operation;
-            Optional<CatalogManager.TableLookupResult> result =
+            Optional<ContextResolvedTable> result =

Review comment:
       nit: maybe rename the variable here and below?

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -1513,10 +1512,9 @@ private TableResultInternal buildResult(String[] headers, DataType[] types, Obje
         List<String> tableNames = new ArrayList<>(operations.size());
         Map<String, Integer> tableNameToCount = new HashMap<>();
         for (ModifyOperation operation : operations) {
-            if (operation instanceof CatalogSinkModifyOperation) {
-                ObjectIdentifier identifier =
-                        ((CatalogSinkModifyOperation) operation).getTableIdentifier();
-                String fullName = identifier.asSummaryString();
+            if (operation instanceof SinkModifyOperation) {
+                String fullName =
+                        ((SinkModifyOperation) operation).getContextResolvedTable().toString();

Review comment:
       again, we should not use `toString` but dedicated methods that indicate the importance like `asSummaryString`. There are too many strings floating around in the stack.

##########
File path: flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
##########
@@ -138,31 +141,33 @@ public static Executor lookupExecutor(
         final SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
         final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder();
 
-        final UnresolvedIdentifier unresolvedIdentifier;
-        if (viewPath != null) {
-            unresolvedIdentifier = getParser().parseIdentifier(viewPath);
-        } else {
-            unresolvedIdentifier =
-                    UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + dataStream.getId());
-        }
-        final ObjectIdentifier objectIdentifier =
-                catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
         final SchemaTranslator.ConsumingResult schemaTranslationResult =
                 SchemaTranslator.createConsumingResult(
                         catalogManager.getDataTypeFactory(), dataStream.getType(), schema);
 
         final ResolvedSchema resolvedSchema =
                 schemaTranslationResult.getSchema().resolve(schemaResolver);
+        final ResolvedCatalogTable resolvedCatalogTable =
+                new ResolvedCatalogTable(new ExternalCatalogTable(resolvedSchema), resolvedSchema);

Review comment:
       `ExternalCatalogTable` could also take the unresolved schema directly and we just pass the whole thing to `CatalogManager.resolveCatalogTable` no special logic required.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CollectModifyOperation.java
##########
@@ -48,19 +45,14 @@
     // QueryOperation.getResolvedSchema()
     private DataType consumedDataType;
 
-    public CollectModifyOperation(ObjectIdentifier tableIdentifier, QueryOperation child) {
-        this.tableIdentifier = tableIdentifier;
+    public CollectModifyOperation(QueryOperation child) {
         this.child = child;
     }
 
     public static int getUniqueId() {

Review comment:
       can this be removed as well?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {
+                return relBuilder
+                        .scan(
+                                objectIdentifier.get().getCatalogName(),
+                                objectIdentifier.get().getDatabaseName(),
+                                objectIdentifier.get().getObjectName())
+                        .build();
+            }
+            return createAnonymousLogicalTableScan(catalogTable);
+        }
+
+        /** This manually creates the logical table scan to skip the calcite catalog resolution. */
+        private RelNode createAnonymousLogicalTableScan(SourceQueryOperation catalogTable) {
+            // Statistics are unknown for anonymous tables
+            // Look at DatabaseCalciteSchema#getStatistic for more details
+            FlinkStatistic flinkStatistic =

Review comment:
       nit: maybe move all this code to `CatalogSourceTable.createAnonymous(ContextResolvedTable, boolean isBatchMode)`

##########
File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
##########
@@ -202,8 +202,8 @@ public CatalogSinkModifyOperation createInsertOperation(
         UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(targetTablePath);
         ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
 
-        return new CatalogSinkModifyOperation(
-                identifier,
+        return new SinkModifyOperation(
+                catalogManager.getTable(identifier).get(),

Review comment:
       use the previously mentioned `getTableOrError` everywhere, it should fail but if it does we have a nice exception

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -137,10 +142,12 @@
     private final AggregateVisitor aggregateVisitor = new AggregateVisitor();
     private final TableAggregateVisitor tableAggregateVisitor = new TableAggregateVisitor();
     private final JoinExpressionVisitor joinExpressionVisitor = new JoinExpressionVisitor();
+    private final boolean isStreamingMode;
 
-    public QueryOperationConverter(FlinkRelBuilder relBuilder) {
+    public QueryOperationConverter(FlinkRelBuilder relBuilder, boolean isStreamingMode) {

Review comment:
       nit: do `isBatchMode` because batch is a special case of streaming, this is kind of inconsistent in the code base right now

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {
+                return relBuilder
+                        .scan(
+                                objectIdentifier.get().getCatalogName(),
+                                objectIdentifier.get().getDatabaseName(),
+                                objectIdentifier.get().getObjectName())
+                        .build();
+            }
+            return createAnonymousLogicalTableScan(catalogTable);
+        }
+
+        /** This manually creates the logical table scan to skip the calcite catalog resolution. */
+        private RelNode createAnonymousLogicalTableScan(SourceQueryOperation catalogTable) {

Review comment:
       give parameter a better name

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
##########
@@ -593,8 +606,20 @@ public TableResult executeInsert(TableDescriptor descriptor, boolean overwrite)
         final TableDescriptor updatedDescriptor =
                 descriptor.toBuilder().schema(schemaTranslationResult.getSchema()).build();
 
-        tableEnvironment.createTemporaryTable(path, updatedDescriptor);
-        return executeInsert(path, overwrite);
+        final ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                tableEnvironment
+                        .getCatalogManager()
+                        .resolveCatalogBaseTable(updatedDescriptor.toCatalogTable());
+
+        ModifyOperation operation =
+                new SinkModifyOperation(
+                        ContextResolvedTable.anonymous(resolvedCatalogBaseTable),
+                        getQueryOperation(),
+                        Collections.emptyMap(),
+                        overwrite,
+                        Collections.emptyMap());
+
+        return tableEnvironment.executeInternal(Collections.singletonList(operation));

Review comment:
       reduce code duplication a bit by having an private `executeInsert` that takes `(ContextResolvedTable, boolean overwrite)`

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -210,7 +210,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
                 startupOptions.startupMode,
                 startupOptions.specificOffsets,
                 startupOptions.startupTimestampMillis,
-                context.getIdentifier().map(ObjectIdentifier::toString).orElse("anonymous"));
+                context.getIdentifier().map(ObjectIdentifier::asSummaryString).orElse("anonymous"));

Review comment:
       let's introduce another context identifier

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
##########
@@ -80,28 +79,28 @@ public static RelNode convertDataStreamToRel(
             boolean isBatchMode,
             ReadableConfig config,
             FlinkRelBuilder relBuilder,
-            ObjectIdentifier identifier,
-            ResolvedSchema schema,
+            ContextResolvedTable contextResolvedTable,
             DataStream<?> dataStream,
             DataType physicalDataType,
             boolean isTopLevelRecord,
             ChangelogMode changelogMode) {
-        final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
-        final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema);
         final DynamicTableSource tableSource =
                 new ExternalDynamicSource<>(
-                        identifier, dataStream, physicalDataType, isTopLevelRecord, changelogMode);
+                        dataStream, physicalDataType, isTopLevelRecord, changelogMode);
         final FlinkStatistic statistic =
                 FlinkStatistic.builder()
                         // this is a temporary solution, FLINK-15123 will resolve this

Review comment:
       this exists too many times. create a method `FlinkStatistic.create(ResolvedSchema)` or so.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/ExternalDynamicSource.java
##########
@@ -126,11 +121,8 @@ private String generateOperatorName() {
 
     private String generateOperatorDesc() {
         return String.format(
-                "DataSteamToTable(stream=%s, type=%s, rowtime=%s, watermark=%s)",
-                identifier.asSummaryString(),
-                physicalDataType.toString(),
-                produceRowtimeMetadata,
-                propagateWatermark);
+                "DataSteamToTable(type=%s, rowtime=%s, watermark=%s)",

Review comment:
       readd an identifier in the summary

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
##########
@@ -215,7 +216,7 @@ public void testTableSource() {
                         StartupMode.SPECIFIC_OFFSETS,
                         specificOffsets,
                         0);
-        assertEquals(actualKafkaSource, expectedKafkaSource);
+        Assertions.assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);

Review comment:
       static import

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java
##########
@@ -26,25 +26,32 @@
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /** Operation to describe a CREATE TABLE AS statement. */
 @Internal
 public class CreateTableASOperation implements CreateOperation {
 
     private final CreateTableOperation createTableOperation;
-    private final SinkModifyOperation insertOperation;
+    private final Supplier<SinkModifyOperation> insertOperationFactory;
+
+    private SinkModifyOperation insertOperation;

Review comment:
       no mutability in operations, pass the resolved table to `getInsertOperation(...)` instead.

##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
##########
@@ -18,52 +18,52 @@
 
 package org.apache.flink.table.planner.plan.schema
 
-import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
+import org.apache.flink.table.catalog.ContextResolvedTable
 import org.apache.flink.table.connector.source.DynamicTableSource
 import org.apache.flink.table.planner.calcite.FlinkContext
 import org.apache.flink.table.planner.connectors.DynamicSourceUtils
 import org.apache.flink.table.planner.plan.abilities.source.{SourceAbilityContext, SourceAbilitySpec}
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptSchema
 import org.apache.calcite.rel.`type`.RelDataType
 
 import java.util
+import java.util.Collections
 
 /**
  * A [[FlinkPreparingTableBase]] implementation which defines the context variables
  * required to translate the Calcite [[org.apache.calcite.plan.RelOptTable]] to the Flink specific
  * relational expression with [[DynamicTableSource]].
  *
  * @param relOptSchema The RelOptSchema that this table comes from
- * @param tableIdentifier The full path of the table to retrieve.
  * @param rowType The table row type
  * @param statistic The table statistics
  * @param tableSource The [[DynamicTableSource]] for which is converted to a Calcite Table
  * @param isStreamingMode A flag that tells if the current table is in stream mode
- * @param catalogTable Resolved catalog table where this table source table comes from
+ * @param contextResolvedTable Resolved catalog table where this table source table comes from
  * @param flinkContext The flink context which is used to generate extra digests based on
  *                     abilitySpecs
  * @param abilitySpecs The abilitySpecs applied to the source
  */
 class TableSourceTable(
     relOptSchema: RelOptSchema,
-    val tableIdentifier: ObjectIdentifier,
     rowType: RelDataType,
     statistic: FlinkStatistic,
     val tableSource: DynamicTableSource,
     val isStreamingMode: Boolean,
-    val catalogTable: ResolvedCatalogTable,
+    val contextResolvedTable: ContextResolvedTable,
     val flinkContext: FlinkContext,
     val abilitySpecs: Array[SourceAbilitySpec] = Array.empty)
   extends FlinkPreparingTableBase(
     relOptSchema,
     rowType,
-    util.Arrays.asList(
-      tableIdentifier.getCatalogName,
-      tableIdentifier.getDatabaseName,
-      tableIdentifier.getObjectName),
+    toScala(contextResolvedTable.getIdentifier).map(id => util.Arrays.asList(

Review comment:
       improve formatting here or introduce a helper method

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java
##########
@@ -114,24 +110,31 @@ private ResolvedCatalogTable createFinalCatalogTable(
                             FlinkHints.HINT_NAME_OPTIONS,
                             TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED.key()));
         }
-        return catalogTable.copy(
-                FlinkHints.mergeTableOptions(hintedOptions, catalogTable.getOptions()));
+        return schemaTable
+                .getContextResolvedTable()

Review comment:
       nit: store context resolved table in a local variable to improve code readability

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -76,10 +78,23 @@ public StatementSet addInsert(String targetPath, Table table, boolean overwrite)
                 tableEnvironment.getParser().parseIdentifier(targetPath);
         ObjectIdentifier objectIdentifier =
                 tableEnvironment.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+        ContextResolvedTable contextResolvedTable =
+                tableEnvironment
+                        .getCatalogManager()
+                        .getTable(objectIdentifier)
+                        .orElseThrow(
+                                () ->
+                                        new TableException(

Review comment:
       nit: introduce a local variable when referencing catalog manager multiple times

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {

Review comment:
       or do we need to do this for the statistics?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   * 104164685e8d378ea3983b2b7ed3aafaa71dcec4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 47668982c9897b1f8d8d7ffa8b10ac8a134f088c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f7aa49ab83f748fe9cbf1186add84929ea50bfb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555) 
   * 0aa7220300adc4f5f9ef0aa59608f299b3b9ac29 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 47668982c9897b1f8d8d7ffa8b10ac8a134f088c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418) 
   * a14694e186c08ce08e5e4af5d60f410f43025965 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654",
       "triggerID" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 04379c97fabc5ee28e30dfcd6485bf8e5c0da69e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625) 
   * e4399cdc5964a3f49354cb114f5c043f5b2702dc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1012963542


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r785791088



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and it's flagged as temporary.
+ *   <li>A anonymous/inline table: a table which is not stored in a catalog and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object
+ * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private final @Nullable ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), resolvedTable);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(null, null, resolvedTable);
+    }
+
+    private ContextResolvedTable(
+            @Nullable ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        this.objectIdentifier = objectIdentifier;
+        this.catalog = catalog;
+        this.resolvedTable = resolvedTable;
+    }
+
+    public boolean isAnonymous() {
+        return objectIdentifier == null;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    /** Returns empty if {@link #isAnonymous()} is true. */
+    public Optional<ObjectIdentifier> getIdentifier() {
+        return Optional.ofNullable(objectIdentifier);
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException("Cannot copy VIEW with new options.");
+        }
+        return new ContextResolvedTable(
+                objectIdentifier, catalog, ((ResolvedCatalogTable) resolvedTable).copy(newOptions));
+    }
+
+    @Override
+    public String toString() {

Review comment:
       I kept it for debugging but i'm not relying on it anymore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r785834280



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
##########
@@ -80,28 +79,28 @@ public static RelNode convertDataStreamToRel(
             boolean isBatchMode,
             ReadableConfig config,
             FlinkRelBuilder relBuilder,
-            ObjectIdentifier identifier,
-            ResolvedSchema schema,
+            ContextResolvedTable contextResolvedTable,
             DataStream<?> dataStream,
             DataType physicalDataType,
             boolean isTopLevelRecord,
             ChangelogMode changelogMode) {
-        final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
-        final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema);
         final DynamicTableSource tableSource =
                 new ExternalDynamicSource<>(
-                        identifier, dataStream, physicalDataType, isTopLevelRecord, changelogMode);
+                        dataStream, physicalDataType, isTopLevelRecord, changelogMode);
         final FlinkStatistic statistic =
                 FlinkStatistic.builder()
                         // this is a temporary solution, FLINK-15123 will resolve this

Review comment:
       WDYM? I see this combination of the FlinkStatistic builder usage only once




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654",
       "triggerID" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a2b67f6358c91a443a8b2d99643e5298d08ba14",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29694",
       "triggerID" : "6a2b67f6358c91a443a8b2d99643e5298d08ba14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6a2b67f6358c91a443a8b2d99643e5298d08ba14 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29694) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654",
       "triggerID" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a2b67f6358c91a443a8b2d99643e5298d08ba14",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29694",
       "triggerID" : "6a2b67f6358c91a443a8b2d99643e5298d08ba14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4399cdc5964a3f49354cb114f5c043f5b2702dc Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654) 
   * 6a2b67f6358c91a443a8b2d99643e5298d08ba14 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29694) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654",
       "triggerID" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e4399cdc5964a3f49354cb114f5c043f5b2702dc Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29654) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   * 104164685e8d378ea3983b2b7ed3aafaa71dcec4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011990586


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit ab6bf560bcaba4f16941a56b9e08b19044957c7a (Thu Jan 13 10:15:43 UTC 2022)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r785806163



##########
File path: flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java
##########
@@ -93,12 +82,12 @@ public ChangelogMode getChangelogMode() {
     @Override
     public String asSummaryString() {
         final Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", identifier);
+        args.put("identifier", contextResolvedTable);

Review comment:
       Same comment as here https://github.com/apache/flink/pull/18349#discussion_r785791932




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   * 104164685e8d378ea3983b2b7ed3aafaa71dcec4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   * 104164685e8d378ea3983b2b7ed3aafaa71dcec4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0aa7220300adc4f5f9ef0aa59608f299b3b9ac29 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584) 
   * 8dd59b65de6e71d57da22d35686b0758ff3aed24 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31716fed804fd92e352ceab5f74965856d3c0765",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31716fed804fd92e352ceab5f74965856d3c0765",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a14694e186c08ce08e5e4af5d60f410f43025965 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541) 
   * 31716fed804fd92e352ceab5f74965856d3c0765 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r787459410



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and is flagged as temporary.
+ *   <li>An anonymous/inline table: a table which is not stored in a catalog and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object
+ * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private static final AtomicInteger uniqueId = new AtomicInteger(0);
+
+    private final ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+    private final boolean anonymous;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), resolvedTable, false);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable, false);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                ObjectIdentifier.ofAnonymous(
+                        generateAnonymousStringIdentifier(null, resolvedTable)),
+                null,
+                resolvedTable,
+                true);
+    }
+
+    public static ContextResolvedTable anonymous(
+            String hint, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                ObjectIdentifier.ofAnonymous(
+                        generateAnonymousStringIdentifier(hint, resolvedTable)),
+                null,
+                resolvedTable,
+                true);
+    }
+
+    private ContextResolvedTable(
+            ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable,
+            boolean anonymous) {
+        this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier);
+        this.catalog = catalog;
+        this.resolvedTable = Preconditions.checkNotNull(resolvedTable);
+        this.anonymous = anonymous;
+    }
+
+    public boolean isAnonymous() {
+        return this.anonymous;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    public ObjectIdentifier getIdentifier() {
+        return objectIdentifier;
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new ValidationException(
+                    String.format("The view '%s' cannot be enriched with new options.", this));

Review comment:
       Added this specific error about hints at the call site.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r787453251



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -157,7 +157,7 @@ public static DynamicTableSource createDynamicTableSource(
         } catch (Throwable t) {
             throw new ValidationException(
                     String.format(
-                            "Unable to create a source for reading table '%s'.\n\n"
+                            "Unable to create a source for reading the table '%s'.\n\n"

Review comment:
       What did you meant with this comment then? https://github.com/apache/flink/pull/18349#discussion_r784698670 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r787503439



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java
##########
@@ -26,25 +26,32 @@
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /** Operation to describe a CREATE TABLE AS statement. */
 @Internal
 public class CreateTableASOperation implements CreateOperation {
 
     private final CreateTableOperation createTableOperation;
-    private final SinkModifyOperation insertOperation;
+    private final Supplier<SinkModifyOperation> insertOperationFactory;
+
+    private SinkModifyOperation insertOperation;

Review comment:
       I've modified a bit the structure of `CreateTableASOperation` to make the summary string senseful (and similar to `SinkModifyOperation`) and remove that internal mutability. check now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #18349:
URL: https://github.com/apache/flink/pull/18349


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 47668982c9897b1f8d8d7ffa8b10ac8a134f088c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab6bf560bcaba4f16941a56b9e08b19044957c7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380) 
   * face8aee78f7f301464b03690fab1070447edd6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387) 
   * 104164685e8d378ea3983b2b7ed3aafaa71dcec4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31716fed804fd92e352ceab5f74965856d3c0765",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31716fed804fd92e352ceab5f74965856d3c0765",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 47668982c9897b1f8d8d7ffa8b10ac8a134f088c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418) 
   * a14694e186c08ce08e5e4af5d60f410f43025965 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541) 
   * 31716fed804fd92e352ceab5f74965856d3c0765 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r785825772



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
##########
@@ -340,14 +347,46 @@ private RelNode convertLegacyTableFunction(
         }
 
         @Override
-        public RelNode visit(CatalogQueryOperation catalogTable) {
-            ObjectIdentifier objectIdentifier = catalogTable.getTableIdentifier();
-            return relBuilder
-                    .scan(
-                            objectIdentifier.getCatalogName(),
-                            objectIdentifier.getDatabaseName(),
-                            objectIdentifier.getObjectName())
-                    .build();
+        public RelNode visit(SourceQueryOperation catalogTable) {
+            Optional<ObjectIdentifier> objectIdentifier =
+                    catalogTable.getContextResolvedTable().getIdentifier();
+            if (objectIdentifier.isPresent()) {

Review comment:
       I think we need it for statistics, see `DatabaseCalciteSchema`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r785843570



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java
##########
@@ -26,25 +26,32 @@
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /** Operation to describe a CREATE TABLE AS statement. */
 @Internal
 public class CreateTableASOperation implements CreateOperation {
 
     private final CreateTableOperation createTableOperation;
-    private final SinkModifyOperation insertOperation;
+    private final Supplier<SinkModifyOperation> insertOperationFactory;
+
+    private SinkModifyOperation insertOperation;

Review comment:
       I think I can't, check `asSummaryString`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a14694e186c08ce08e5e4af5d60f410f43025965 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541) 
   * 8f7aa49ab83f748fe9cbf1186add84929ea50bfb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29620",
       "triggerID" : "8dd59b65de6e71d57da22d35686b0758ff3aed24",
       "triggerType" : "PUSH"
     }, {
       "hash" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625",
       "triggerID" : "04379c97fabc5ee28e30dfcd6485bf8e5c0da69e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e4399cdc5964a3f49354cb114f5c043f5b2702dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 04379c97fabc5ee28e30dfcd6485bf8e5c0da69e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29625) 
   * e4399cdc5964a3f49354cb114f5c043f5b2702dc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f7aa49ab83f748fe9cbf1186add84929ea50bfb Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555) 
   * 0aa7220300adc4f5f9ef0aa59608f299b3b9ac29 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18349:
URL: https://github.com/apache/flink/pull/18349#issuecomment-1011991081


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29380",
       "triggerID" : "ab6bf560bcaba4f16941a56b9e08b19044957c7a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "face8aee78f7f301464b03690fab1070447edd6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29387",
       "triggerID" : "face8aee78f7f301464b03690fab1070447edd6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29390",
       "triggerID" : "104164685e8d378ea3983b2b7ed3aafaa71dcec4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29418",
       "triggerID" : "47668982c9897b1f8d8d7ffa8b10ac8a134f088c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "1012963542",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29541",
       "triggerID" : "a14694e186c08ce08e5e4af5d60f410f43025965",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29555",
       "triggerID" : "8f7aa49ab83f748fe9cbf1186add84929ea50bfb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584",
       "triggerID" : "0aa7220300adc4f5f9ef0aa59608f299b3b9ac29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0aa7220300adc4f5f9ef0aa59608f299b3b9ac29 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29584) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18349: [FLINK-25609][table] Anonymous/inline tables don't require ObjectIdentifier anymore

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18349:
URL: https://github.com/apache/flink/pull/18349#discussion_r786804756



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and is flagged as temporary.
+ *   <li>An anonymous/inline table: a table which is not stored in a catalog and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object
+ * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private static final AtomicInteger uniqueId = new AtomicInteger(0);
+
+    private final ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+    private final boolean anonymous;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), resolvedTable, false);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable, false);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                ObjectIdentifier.ofAnonymous(
+                        generateAnonymousStringIdentifier(null, resolvedTable)),
+                null,
+                resolvedTable,
+                true);
+    }
+
+    public static ContextResolvedTable anonymous(
+            String hint, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                ObjectIdentifier.ofAnonymous(
+                        generateAnonymousStringIdentifier(hint, resolvedTable)),
+                null,
+                resolvedTable,
+                true);
+    }
+
+    private ContextResolvedTable(
+            ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable,
+            boolean anonymous) {
+        this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier);
+        this.catalog = catalog;
+        this.resolvedTable = Preconditions.checkNotNull(resolvedTable);
+        this.anonymous = anonymous;
+    }
+
+    public boolean isAnonymous() {
+        return this.anonymous;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    public ObjectIdentifier getIdentifier() {
+        return objectIdentifier;
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new ValidationException(
+                    String.format("The view '%s' cannot be enriched with new options.", this));
+        }
+        return new ContextResolvedTable(
+                objectIdentifier,
+                catalog,
+                ((ResolvedCatalogTable) resolvedTable).copy(newOptions),
+                false);
+    }
+
+    @Override
+    public String toString() {
+        return objectIdentifier.asSummaryString();
+    }
+
+    /**
+     * This method tries to return the connector name of the table, trying to provide a bit more
+     * helpful toString for anonymous tables. It's only to help users to debug, and its return value
+     * should not be relied on.
+     */
+    private static String generateAnonymousStringIdentifier(
+            @Nullable String hint, ResolvedCatalogBaseTable<?> resolvedTable) {
+        // This has been added after a very intensive debugging session, so don't remove it unless
+        // you really know what you're doing.
+        // Planner can do some fancy optimizations' logic squashing two sources together in the same
+        // operator. Because this logic is string based, anonymous tables still need some kind of
+        // unique string based identifier that can be used later by the planner.
+        if (hint == null) {
+            try {
+                hint = resolvedTable.getOptions().get(FactoryUtil.CONNECTOR.key());
+            } catch (Exception ignored) {
+            }
+        }
+
+        int id = uniqueId.incrementAndGet();
+        if (hint == null) {
+            return "anonymous$" + id;
+        }
+
+        return "anonymous_" + hint + "$" + id;

Review comment:
       let's put `*...*` around it to indicate an anonymous object consistently. we do the same for functions and types.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and is flagged as temporary.
+ *   <li>An anonymous/inline table: a table which is not stored in a catalog and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object
+ * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private static final AtomicInteger uniqueId = new AtomicInteger(0);
+
+    private final ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+    private final boolean anonymous;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), resolvedTable, false);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable, false);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                ObjectIdentifier.ofAnonymous(
+                        generateAnonymousStringIdentifier(null, resolvedTable)),
+                null,
+                resolvedTable,
+                true);
+    }
+
+    public static ContextResolvedTable anonymous(
+            String hint, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                ObjectIdentifier.ofAnonymous(
+                        generateAnonymousStringIdentifier(hint, resolvedTable)),
+                null,
+                resolvedTable,
+                true);
+    }
+
+    private ContextResolvedTable(
+            ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable,
+            boolean anonymous) {
+        this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier);
+        this.catalog = catalog;
+        this.resolvedTable = Preconditions.checkNotNull(resolvedTable);
+        this.anonymous = anonymous;
+    }
+
+    public boolean isAnonymous() {
+        return this.anonymous;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    public ObjectIdentifier getIdentifier() {
+        return objectIdentifier;
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new ValidationException(
+                    String.format("The view '%s' cannot be enriched with new options.", this));
+        }
+        return new ContextResolvedTable(
+                objectIdentifier,
+                catalog,
+                ((ResolvedCatalogTable) resolvedTable).copy(newOptions),
+                false);
+    }
+
+    @Override
+    public String toString() {
+        return objectIdentifier.asSummaryString();
+    }
+
+    /**
+     * This method tries to return the connector name of the table, trying to provide a bit more
+     * helpful toString for anonymous tables. It's only to help users to debug, and its return value
+     * should not be relied on.
+     */
+    private static String generateAnonymousStringIdentifier(
+            @Nullable String hint, ResolvedCatalogBaseTable<?> resolvedTable) {
+        // This has been added after a very intensive debugging session, so don't remove it unless

Review comment:
       remove this and the following line, people should always know what they are doing in this code base

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -157,7 +157,7 @@ public static DynamicTableSource createDynamicTableSource(
         } catch (Throwable t) {
             throw new ValidationException(
                     String.format(
-                            "Unable to create a source for reading table '%s'.\n\n"
+                            "Unable to create a source for reading the table '%s'.\n\n"

Review comment:
       not a typo: this is similar to http://www.englishteachermelanie.com/grammar-when-not-to-use-the-definite-article/ point 5

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -107,8 +109,20 @@ public StatementSet addInsert(
         final TableDescriptor updatedDescriptor =
                 targetDescriptor.toBuilder().schema(schemaTranslationResult.getSchema()).build();
 
-        tableEnvironment.createTemporaryTable(path, updatedDescriptor);
-        return addInsert(path, table, overwrite);
+        final ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                tableEnvironment
+                        .getCatalogManager()
+                        .resolveCatalogBaseTable(updatedDescriptor.toCatalogTable());

Review comment:
       can be `resolveCatalogTable `

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
##########
@@ -593,8 +587,25 @@ public TableResult executeInsert(TableDescriptor descriptor, boolean overwrite)
         final TableDescriptor updatedDescriptor =
                 descriptor.toBuilder().schema(schemaTranslationResult.getSchema()).build();
 
-        tableEnvironment.createTemporaryTable(path, updatedDescriptor);
-        return executeInsert(path, overwrite);
+        final ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                tableEnvironment
+                        .getCatalogManager()
+                        .resolveCatalogBaseTable(updatedDescriptor.toCatalogTable());

Review comment:
       can be `resolveCatalogTable `

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
##########
@@ -540,9 +538,11 @@ public Table from(String path) {
     public Table from(TableDescriptor descriptor) {
         Preconditions.checkNotNull(descriptor, "Table descriptor must not be null.");
 
-        final String path = TableDescriptorUtil.getUniqueAnonymousPath();
-        createTemporaryTableInternal(UnresolvedIdentifier.of(path), descriptor);
-        return from(path);
+        final ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                catalogManager.resolveCatalogBaseTable(descriptor.toCatalogTable());

Review comment:
       can be `resolveCatalogTable `

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class contains information about a table, its {@link ResolvedSchema}, its options and its
+ * relationship with a {@link Catalog}, if any.
+ *
+ * <p>There can be 3 kinds of {@link ContextResolvedTable}:
+ *
+ * <ul>
+ *   <li>A permanent table: a table which is stored in a {@link Catalog} and has an associated
+ *       unique {@link ObjectIdentifier}.
+ *   <li>A temporary table: a table which is stored in the {@link CatalogManager}, has an associated
+ *       unique {@link ObjectIdentifier} and is flagged as temporary.
+ *   <li>An anonymous/inline table: a table which is not stored in a catalog and doesn't have an
+ *       associated unique {@link ObjectIdentifier}.
+ * </ul>
+ *
+ * <p>The different handling of temporary and permanent tables is {@link Catalog} and {@link
+ * CatalogManager} instance specific, hence for these two kind of tables, an instance of this object
+ * represents the relationship between the specific {@link ResolvedCatalogBaseTable} instance and
+ * the specific {@link Catalog}/{@link CatalogManager} instances. For example, the same {@link
+ * ResolvedCatalogBaseTable} can be temporary for one catalog, but permanent for another one.
+ */
+@Internal
+public class ContextResolvedTable {
+
+    private static final AtomicInteger uniqueId = new AtomicInteger(0);
+
+    private final ObjectIdentifier objectIdentifier;
+    private final @Nullable Catalog catalog;
+    private final ResolvedCatalogBaseTable<?> resolvedTable;
+    private final boolean anonymous;
+
+    public static ContextResolvedTable permanent(
+            ObjectIdentifier identifier,
+            Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                identifier, Preconditions.checkNotNull(catalog), resolvedTable, false);
+    }
+
+    public static ContextResolvedTable temporary(
+            ObjectIdentifier identifier, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(identifier, null, resolvedTable, false);
+    }
+
+    public static ContextResolvedTable anonymous(ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                ObjectIdentifier.ofAnonymous(
+                        generateAnonymousStringIdentifier(null, resolvedTable)),
+                null,
+                resolvedTable,
+                true);
+    }
+
+    public static ContextResolvedTable anonymous(
+            String hint, ResolvedCatalogBaseTable<?> resolvedTable) {
+        return new ContextResolvedTable(
+                ObjectIdentifier.ofAnonymous(
+                        generateAnonymousStringIdentifier(hint, resolvedTable)),
+                null,
+                resolvedTable,
+                true);
+    }
+
+    private ContextResolvedTable(
+            ObjectIdentifier objectIdentifier,
+            @Nullable Catalog catalog,
+            ResolvedCatalogBaseTable<?> resolvedTable,
+            boolean anonymous) {
+        this.objectIdentifier = Preconditions.checkNotNull(objectIdentifier);
+        this.catalog = catalog;
+        this.resolvedTable = Preconditions.checkNotNull(resolvedTable);
+        this.anonymous = anonymous;
+    }
+
+    public boolean isAnonymous() {
+        return this.anonymous;
+    }
+
+    /** @return true if the table is temporary. An anonymous table is always temporary. */
+    public boolean isTemporary() {
+        return catalog == null;
+    }
+
+    public boolean isPermanent() {
+        return !isTemporary();
+    }
+
+    public ObjectIdentifier getIdentifier() {
+        return objectIdentifier;
+    }
+
+    /** Returns empty if {@link #isPermanent()} is false. */
+    public Optional<Catalog> getCatalog() {
+        return Optional.ofNullable(catalog);
+    }
+
+    /** Returns a fully resolved catalog object. */
+    @SuppressWarnings("unchecked")
+    public <T extends ResolvedCatalogBaseTable<?>> T getResolvedTable() {
+        return (T) resolvedTable;
+    }
+
+    public ResolvedSchema getResolvedSchema() {
+        return resolvedTable.getResolvedSchema();
+    }
+
+    /** Returns the original metadata object returned by the catalog. */
+    @SuppressWarnings("unchecked")
+    public <T extends CatalogBaseTable> T getTable() {
+        return (T) resolvedTable.getOrigin();
+    }
+
+    /**
+     * Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options.
+     */
+    public ContextResolvedTable copy(Map<String, String> newOptions) {
+        if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new ValidationException(
+                    String.format("The view '%s' cannot be enriched with new options.", this));

Review comment:
       "View '%s' cannot be enriched with new options. Hints can only be applied to tables."

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java
##########
@@ -26,25 +26,32 @@
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /** Operation to describe a CREATE TABLE AS statement. */
 @Internal
 public class CreateTableASOperation implements CreateOperation {
 
     private final CreateTableOperation createTableOperation;
-    private final SinkModifyOperation insertOperation;
+    private final Supplier<SinkModifyOperation> insertOperationFactory;
+
+    private SinkModifyOperation insertOperation;

Review comment:
       then simplify the summary string? It seems not really a summary anyways looking at `getCatalogTable().toProperties()`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
##########
@@ -80,28 +79,28 @@ public static RelNode convertDataStreamToRel(
             boolean isBatchMode,
             ReadableConfig config,
             FlinkRelBuilder relBuilder,
-            ObjectIdentifier identifier,
-            ResolvedSchema schema,
+            ContextResolvedTable contextResolvedTable,
             DataStream<?> dataStream,
             DataType physicalDataType,
             boolean isTopLevelRecord,
             ChangelogMode changelogMode) {
-        final CatalogTable unresolvedTable = new InlineCatalogTable(schema);
-        final ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(unresolvedTable, schema);
         final DynamicTableSource tableSource =
                 new ExternalDynamicSource<>(
-                        identifier, dataStream, physicalDataType, isTopLevelRecord, changelogMode);
+                        dataStream, physicalDataType, isTopLevelRecord, changelogMode);
         final FlinkStatistic statistic =
                 FlinkStatistic.builder()
                         // this is a temporary solution, FLINK-15123 will resolve this

Review comment:
        You added another one below. This comment exists 3 times in the code base now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org