You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/09 08:56:13 UTC

[GitHub] [iceberg] wuwenchi opened a new issue, #5000: Proposal: FlinkSQL supports partition transform by computed columns

wuwenchi opened a new issue, #5000:
URL: https://github.com/apache/iceberg/issues/5000

   # Goal
   Flink-sql supports creating tables with hidden partitions.
   
   # Example
   Create a table with hidden partitions:
   
   ```sql
   CREATE TABLE tb (
     ts TIMESTAMP,
     id INT,
     prop STRING,
     par_ts AS days(ts),                --- transform partition: day
     par_prop AS truncates(6, prop)     --- transform partition: truncate
   ) PARTITIONED BY (
     par_ts, id, par_prop               --- use transform/identity partition
   );
   ```
   
   # Supported Functions
   
   ```
   years (col)
   months (col)
   days (col)
   hours (col)
   truncates (width, col)
   buckets (width, col)
   ```
   
   # Solution
   1. We have created the corresponding UDF for each partition transform, and register these in the catalog, so we can use these functions directly.
   2. Restriction: Because computed columns are not currently supported, when there are computed columns in DDL, these computed columns must be in partition keys. 
        ( Here is a PR that supports computed columns #4625 , if support this feature, we don't need this restriction. )
   4. By analyzing the expression of the computed column, we can get all the information about the partition key:
     - Name of the computed column can be used to column name of the partition key.
     - Function name in computed column's expression can be mapped to transform expression.
     - Arguments in computed column's expression correspond to the arguments of transform expression, including source column and possible width. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1345701702

   This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wuwenchi commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
wuwenchi commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1153575379

   > We'd need to make sure everything on read happens properly as I'm not sure if Flink SQL will map the names of the fields to their designated partition transforms the way Spark does. Is that why we need the UDFs? Is it normal to preprocess the Flink input or does one typically do that via Calcite or does Flink provide mechanisms / interfaces that we might propose to them to make transforms supported?
   
   1. The difference from spark here is that the partition field name in spark is generated by iceberg by default, while flink can specify the field name in the computed column, so the partition field name uses the name specified by the user.
   Such as, when use `CREATE tb (ts timestamp) USING iceberg PARTITIONED BY (years(ts))`, we get the partition filed name: `ts_years` by default.
   But in flink, when use `CREATE tb (ts timestamp, pts AS years(ts)) PARTITIONED BY (pts)` , we get the partition filed name: `pts`.
   2. We use udf purpose:
   a. Because flinksql does not support adding functions after `PARTITIONED BY`, so we put the functions in the computed columns, and these function names correspond to iceberg's transforms one-to-one.
   b. UDF can limit user input to a certain extent. For example, users can write `years(col)`, but cannot write `years(13, col)`.
   c. As you said, we can directly get the partition of a data through UDF, such as `select buckets(10, 'abc')`.
   
   Of course, maybe the above things can be implemented with Calcite, but since I am not particularly familiar with Calcite, the implementation may be more complicated, and it may also require the cooperation of flink, so we prefer to use this simpler way.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1153064999

   If we could get Flink to support partition transforms as a first class concept, probably via Calcite, that would be super as it would make it so we could push the functions into the `partitioned by` clause itself. Or is that already possible?
   
   If it's not already possible, do you happen to have the Calcite experience or know where in Flink such a change might be made? If it's not too difficult to make a POC, we might present it to the Flink community and raise a FLIP?
   
   Let me know if you know more int his area @wuwenchi


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1153065692

   My original concern with https://github.com/apache/iceberg/pull/4625 is that it doesn't seem like something that would work well for interoperability with other engines.
   
   For example, many people ingest data into an Iceberg based datalake using Flink. Then they do batch based ETL using Spark. Having the `flink.` configurations in the table could possibly cause issue for example if users did the creation via Hive or Spark and then those confis were not present. Is that a concern?
   
   I _love_ the idea of registered functions (though would love it even more if we could get a POC of further integration with Flink on this, possibly needing to use the newer Schema clases on the calcite / Flink side and then ideally casting from ResolvedSchema etc, just for a POC).
   
   I'm admittedly not sure if that's a realistic ask, timewise. I have not looked into extending Flink's table / catalog functionality enough to support this natively, but I do know some people we could talk to if we had an idea 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1154158916

   Also, can you elaborate more on why this restriction exists?
   
   > Restriction: Because computed columns are not currently supported, when there are computed columns in DDL, these computed columns must be in partition keys.
   
   If somebody uses a generated column for a field, why does it have to be a partition field? What am I missing that imposes that requirement?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] github-actions[bot] commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1368130488

   This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1154123892

   So far, I want to say that I like a lot about where this is going @wuwenchi.
   
   I think working on the UDF transforms would be a good first step (as those would have benefit regardless).
   
   > I think this has no effect on other engines, because watermarks and computed columns themselves do not actually store data, they just will add some logical processing when querying, but these logics only take effect on flink.What is actually stored is the data of the original physical columns, and the related format has not changed.
   
   My one concern here would be that users often use one engine, say Flink, for writing and then other engines later for processing. It might be the case that people want these computed columns reflected in the data (possibly even stored, though in the case of partition transforms and partitions in general, the partition fields value isn’t generally stored multiple times).
   
   It might be the case we cannot do certain things other than with Flink. Watermarks might be one of those. Though there could be steps we could take to make as much of this information available to downstream consumers as possible.
   
   For example, it has been discussed before to use the iceberg sequence ID as a form of a watermark (as it’s generally monotonically increasing). While other engines might not have native support for watermarks in them, at least having the data available would be beneficial.
   
   TLDR - Again, I think the UDFs you mentioned would be great to work on first, as those have value regardless of how we proceed next (for example, users might want to query with Iceberg’s bucket function as a way to more narrowly specify a subset of data to perform an action on).
   
   > Of course, maybe the above things can be implemented with Calcite, but since I am not particularly familiar with Calcite, the implementation may be more complicated, and it may also require the cooperation of flink, so we prefer to use this simpler way.
   
   I don’t have much knowledge of Calcite either, but in the more medium to long term, I think it would be good to reach out to the Flink community to possibly have some of these concepts more natively supported.
   
   My concern with using names directly to infer the function is that many users might have columns with those names (_years, etc) already in the data.
   
   But with the approach of the UDF, that issue goes away (as users can already choose the partition column name in Spark for example using `ALTER TABLE … ADD PARTITION FIELD bucket(16, id) AS shard`.
   
   Would you be interested in doing a POC or PR of just the transformation functions at first? Then at the community sync up we can possibly bring this up and form a working group to get input from others on this subject 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1153064660

   If we can register these functions in the catalog, I think this is a great idea.
   
   And they can ideally be stored as normal `PartitionField`s inside of the `PartitionSpec`.
   
   We'd need to make sure everything on read happens properly as I'm not sure if Flink SQL will map the names of the fields to their designated partition transforms the way Spark does. Is that why we need the UDFs? Is it normal to preprocess the Flink input or does one typically do that via Calcite or does Flink provide mechanisms / interfaces that we might propose to them to make transforms supported?
   
   I think this would be really useful, though I'd like to see it happen as cleanly as possible and with interop with Spark (and any other engines that support transforms as well).
   
   Though I'm 100% for discussing ways to do this regardless to make transforms a reality.
   
   The functions look useful regardless, like if a table is partitioned on bucketing and a user simply wanted to query it from Flink they might be needed. Is that something you'd be able to work on @wuwenchi?
   
   Let me read over #4625 again. It's been a bit since I've looked at it so please give me a few days (or reach out on Slack if you don't hear from me in a few days) but I do like where this is heading from your sample 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wuwenchi commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
wuwenchi commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1153576463

   > My original concern with #4625 is that it doesn't seem like something that would work well for interoperability with other engines.
   
   Other engines do not recognize watermarks and computed columns, so when using non-flink engines, watermark and computed column related properties are ignored.
   I think this has no effect on other engines, because watermarks and computed columns themselves do not actually store data, they just will add some logical processing when querying, but these logics only take effect on flink.What is actually stored is the data of the original physical columns, and the related format has not changed. 
   For this point, similar to the `write.upsert.enabled` property, if this property is configured, it will be written in upsert mode when using flink, and when writing data using other engines, this property will be ignored directly.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wuwenchi commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
wuwenchi commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1150860594

   Looking forward for feedback!
   @yittg @rdblue @openinx @kbendick 
   based on #4251
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
kbendick commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1154156417

   I guess TLDR
   
   > The difference from spark here is that the partition field name in spark is generated by iceberg by default, while flink can specify the field name in the computed column, so the partition field name uses the name specified by the user.
   
   It would be nice if the name could be autogenerated (with the option to override it), but I personally wouldn’t make that a blocker for moving forward if we had the UDFs.
   
   I am generally not a fan of using the names to infer specific information, as Iceberg tends to use numeric IDs for everything to help avoid a number of cases.
   
   For example, if I have a field `referralId` and it were in a flink.computed-columns field or something, and then I dropped it and later added a new field `referralId`, that new field very well potentially represent something else. This is one of the big reasons we use numeric identifiers for fields, so that they can be renamed and that situations like the one mentioned don’t cause issues (as I have seen that exact situation cause issues - especially when you don’t control the field name, e.g. it comes from a 3rd party).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wuwenchi commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
wuwenchi commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1155079167

   
   > For example, if I have a field referralId and it were in a flink.computed-columns field or something, and then I dropped it and later added a new field referralId, that new field very well potentially represent something else.
   It might be the case that people want these computed columns reflected in the data (possibly even stored, though in the case of partition transforms and partitions in general, the partition fields value isn’t generally stored multiple times).
   
   Yes, because flink can't modify the partition field now, I have some lack of consideration in this regard, and there may be some problems in the future. I'll re-analyze this issue. Thanks for your great ideas!
   
   > If somebody uses a generated column for a field, why does it have to be a partition field?
   
   The current version does not support computed columns. When there is a computed column in the DDL, it will directly return unsupported. Because I want to get the transform of the partition through the computed column, but I did not release all the computed columns, only the computed columns that can become the partition transform, other types of computed columns will still have the original error.
   
   > Would you be interested in doing a POC or PR of just the transformation functions at first?
   
   Glad to do this! Can I contact your slack directly when I have some ideas or questions?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wuwenchi commented on issue #5000: Proposal: FlinkSQL supports partition transform by computed columns

Posted by GitBox <gi...@apache.org>.
wuwenchi commented on issue #5000:
URL: https://github.com/apache/iceberg/issues/5000#issuecomment-1153572709

   > Ideally, they could be stored in PartitionSpec as normal PartitionFields.
   
   Yes, in fact, I save these specified partition keys in PartitionSpec.
   When we get the partition keys in creating iceberg table, we create the corresponding transforms according to function name:
   
   ```java
   partitionKeys.forEach(name -> {
       ComputedColumn computedColumn = computedCols.get(name);
       if (computedColumn == null) {
           builder.identity(name);                                               // identity
       } else {
           ...
           switch (udfProp.getFuncName()) {
           case "years":
               builder.year(udfProp.getSrcColumn(), computedColumn.getName());   // transforms
               break;
           case "months":
               builder.month(udfProp.getSrcColumn(), computedColumn.getName());  // transforms
               break;
           ...
           }
       }
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org