You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/11 02:03:00 UTC

[jira] [Commented] (KAFKA-6036) Enable logical materialization to physical materialization

    [ https://issues.apache.org/jira/browse/KAFKA-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645852#comment-16645852 ] 

ASF GitHub Bot commented on KAFKA-6036:
---------------------------------------

guozhangwang opened a new pull request #5779: KAFKA-6036: Local Materialization for Source KTable
URL: https://github.com/apache/kafka/pull/5779
 
 
   Refactor the materialization for source KTables in the way that:
   
   1. If Materialized.as(queryableName) is specified, materialize;
   2. If the downstream operator requires to fetch from this KTable via ValueGetters, materialize;
   3. If the downstream operator requires to send old values, materialize.
   
   Otherwise do not materialize the KTable. E.g. `builder.table("topic").filter().toStream().to("topic")` would not create any state stores.
   
   There's a couple of minor changes along with PR as well:
   
   1. KTableImpl's `queryableStoreName` and `isQueryable` are merged into `queryableStoreName` only, and if it is null it means not queryable. As long as it is not null, it should be queryable (i.e. internally generated names will not be used any more).
   
   To achieve this, splitted `MaterializedInternal.storeName()` and `MaterializedInternal.queryableName()`. The former can be internally generated and will not be exposed to users. QueryableName can be modified to set to the internal store name if we decide to materialize it during the DSL parsing / physical topology generation phase. And only if queryableName is specified the corresponding KTable is determined to be materialized.
   
   2. Found some overlapping unit tests among `KTableImplTest`, and `KTableXXTest`, removed them.
   
   3. There are a few typing bugs found along the way, fixed them as well.
   
   -----------------------
   
   This PR is an illustration of experimenting a poc towards logical materializations.
   
   Today we've logically materialized the KTable for filter / mapValues / transformValues if queryableName is not specified via Materialized, but whenever users specify queryableName we will still always materialize. My original goal is to also consider logically materialize for queryable stores, but when implementing it via a wrapped store to apply the transformations on the fly I realized it is tougher than I thought, because we not only need to support `fetch` or `get`, but also needs to support range queries, `approximateNumEntries`, and `isOpen` etc as well, which are not efficient to support. So in the end I'd suggest we still stick with the rule of always materializing if queryableName is specified, and only consider logical materialization otherwise.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Enable logical materialization to physical materialization
> ----------------------------------------------------------
>
>                 Key: KAFKA-6036
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6036
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>
> Today whenever users specify a queryable store name for KTable, we would always add a physical state store in the translated processor topology.
> For some scenarios, we should consider not physically materialize the KTable but only "logically" materialize it when you have some simple transformation operations or even join operations that generated new KTables, and which needs to be materialized with a state store, you can use the changelog topic of the previous KTable and applies the transformation logic upon restoration instead of creating a new changelog topic. For example:
> {code}
> table1 = builder.table("topic1");
> table2 = table1.filter(..).join(table3); // table2 needs to be materialized for joining
> {code}
> We can actually set the {{getter}} function of table2's materialized store, say {{state2}} to be reading from {{topic1}} and then apply the filter operator, instead of creating a new {{state2-changelog}} topic in this case.
> We can come up with a general internal impl optimizations to determine when to logically materialize, and whether we should actually allow users of DSL to "hint" whether to materialize or not (it then may need a KIP).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)