You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/18 18:56:16 UTC

[GitHub] [iceberg] stevenzwu commented on pull request #6614: Flink:fix flink streaming query problem [ Cannot get a client from a closed pool]

stevenzwu commented on PR #6614:
URL: https://github.com/apache/iceberg/pull/6614#issuecomment-1387592902

   @xuzhiwen1255 thanks a lot for the root cause analysis. I agree with your conclusion. Let's discuss what is the right fix though. I am not sure we should add the extra complexity for `JdbcCatalog` and `ClientPoolImpl`.
   
   I was wondering why this doesn't happen for `HiveCatalog`. Turns out the difference is that `JdbcCatalog` is `Closeable`. Flink `TableLoader` has the following close behavior, which seems fine to me.
   ```
       @Override
       public void close() throws IOException {
         if (catalog instanceof Closeable) {
           ((Closeable) catalog).close();
         }
       }
   ```
   
   Both `FlinkSource` and the new FLIP-27 `IcebergSource` has the same usage pattern of `TableLoader` in the source construction. Here is the code snippet from `FlinkSource`.
   ```
       public FlinkInputFormat buildFormat() {
         Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
   
         Schema icebergSchema;
         FileIO io;
         EncryptionManager encryption;
         if (table == null) {
           // load required fields by table loader.
           tableLoader.open();
           try (TableLoader loader = tableLoader) {
             table = loader.loadTable();
             icebergSchema = table.schema();
             io = table.io();
             encryption = table.encryption();
           } catch (IOException e) {
             throw new UncheckedIOException(e);
           }
         } else {
   ```
   
   I guess issue #6455 is probably the firs report of using closeable `JdbcCatalog` in Flink source. Both `FlinkSource` and FLIP-27 `IcebergSource` need to load a `Table` object to extract fields like schema, io, etc. 
   
   This issue/PR pointed out the problem of reusing the `TableLoader` object for internal table loading inside the `FlinkSource` or `IcebergSource`. One alternative fix is to add a `clone()` method to Flink `TableLoader` interface.
   
   This is a more fundamental change. Hence, I would like to get more feedbacks from @hililiwei @pvary @rdblue .
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org