You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/10/11 15:12:06 UTC

[GitHub] [iceberg] 0xNacho opened a new issue, #5958: Not able to process hundred of RowData types

0xNacho opened a new issue, #5958:
URL: https://github.com/apache/iceberg/issues/5958

   ### Query engine
   
   Flink
   
   ### Question
   
   Hello!
   
   I have a Flink application that reads arbitrary AVRO data, map it to RowData and uses  several FlinkSink instances to write data into ICEBERG tables. By arbitrary data I mean that I have 100 types of AVRO message, all of them with a common property "tableName" but containing different columns. I would like to write each of these type of message into a separated Iceberg table.
   
   For doing this I'm using side outputs: when I have my data mapped to RowData, and I use a ProcessFunction that writes each message into a specific OutputTag.
   
   Later on, with the datastream already processed, I loop into the different output tags, get the rcords using getSideOutput and  create an specific IcebergSink for each of them. Something like:
   
   
   
   ```java
   
           final List<OutputTag<RowData>> tags = ... // list of all possible output tags
   
           final DataStream<RowData> rowdata = stream
                   .map(new ToRowDataMap()) // Map Custom Avro Pojo into RowData
                   .uid("map-row-data")
                   .name("Map to RowData")
                   .process(new ProcessRecordFunction(tags)) // process elements one by one sending them to a specific OutputTag
                   .uid("id-process-record")
                   .name("Process Input records");;
   
           CatalogLoader catalogLoader = ...
           String upsertField = ...
        
           outputTags
                   .stream()
                   .forEach(tag -> {
                       SingleOutputStreamOperator<RowData> outputStream = stream
                               .getSideOutput(tag)
                               .uid("map-row-data-" + tag)
                               .name("Map Row Data " + tag);
   
                       TableIdentifier identifier = TableIdentifier.of("myDBName", tag.getId());
   
                       FlinkSink.Builder builder = FlinkSink
                               .forRowData(outputStream)
                               .table(catalog.loadTable(identifier))
                               .tableLoader(TableLoader.fromCatalog(catalogLoader, identifier))
                               .set("upsert-enabled", "true")
                               .uidPrefix("commiter-sink-" + tableName)
                               .equalityFieldColumns(Collections.singletonList(upsertField));
                       builder.append();
                   });
   
   ```
   
   It works very well when I'm dealing with a few tables. But when the number of tables scales up, Flink cannot adquire enough task resources to keep the application alive and genarates a Flink graph that actually the Flink UI cannot show, by needing too many resources....
   
   Is there any other more efficient way of doing this? or maybe any way of optimizing it?
   
   Thanks in advance ! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] luoyuxia commented on issue #5958: Not able to process hundred of RowData types

Posted by GitBox <gi...@apache.org>.
luoyuxia commented on issue #5958:
URL: https://github.com/apache/iceberg/issues/5958#issuecomment-1296797458

   Actually, I don't think there's a more efficient way.  For you job, it does need so much resources since you need to write so many tables.
   Is it possible to allocate more resource for Flink cluster or split the whole job into serveral small jobs?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] 0xNacho commented on issue #5958: Not able to process hundred of RowData types

Posted by "0xNacho (via GitHub)" <gi...@apache.org>.
0xNacho commented on issue #5958:
URL: https://github.com/apache/iceberg/issues/5958#issuecomment-1400380196

   @luoyuxia thanks for you reply and sorry getting yo back so late!
   
   yeah, it's possible to split the whole job into several small jobs, but number of tables is always increasing and I would like to have some generic implementantion.
   
   Is there any way to implemenet a "generic" sink, containing a map of <String, IcebergFileCommiter>  where the string is the name of the application? so in the end I would only have a "sink" operator instead.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] commented on issue #5958: Not able to process hundred of RowData types

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #5958:
URL: https://github.com/apache/iceberg/issues/5958#issuecomment-1666294844

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org