You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/04/29 15:49:02 UTC

[GitHub] [druid] a2l007 opened a new issue #9791: Introduce extensibility support for Datasources

a2l007 opened a new issue #9791:
URL: https://github.com/apache/druid/issues/9791


   ### Description
   
   Introduce an extension point for the `DataSource` interface that will allow possibilities to create custom datasource implementations. This extension point will be applicable only for datasource types that handle multiple table data sources. Within core druid, `UnionDataSource` is one such type.
   The extension point would look something similar to this:
   
   ```
   public interface MultiDataSource extends DataSource
   {
    List<TimelineObjectHolder<String, ServerSelector>> getSegmentsFromIntervals(
        Query baseQuery,
        Map<String, ? extends TimelineLookup<String, T>> timelineMap
    );
   }
   ```
   The implementations of this interface will have the ability to decide which segments would be selected from which base table datasources from the given set of intervals. For every query, the broker needs to decide the list of segments that needs to be queried against the various queryable nodes and If the incoming query is a type of `MultiDataSource`, the broker can defer the segment selection to its `getSegmentsFromIntervals` method. 
   The `UnionDataSource` can be refactored to be an implementation of this interface and the `getSegmentsFromIntervals` method would return all the segments from all the datasources defined in the union query. To clarify, all the existing DataSource implementations except UnionDataSource would be unaffected. 
   
   For supporting this, the segment selection and related query processing logic would start handling multiple `TableDataSources` .`TimelineServerView.getTimeline`  would be refactored to return a map of Datasource -> Timelinelookup , something like: 
   ```
   Optional<? extends Map<String, ? extends TimelineLookup<String, ServerSelector>>>
    getTimeline(DataSourceAnalysis analysis);
   ```
   
   ### Motivation
   
   - This feature can help users with validation workflows where changes could be tested out on a non-production datasource before applying the changes onto the production datasource. Detailed example of the usecase is mentioned here: #5350
   
   - Union queries are currently executed serially and for a list of datasources, the broker executes the same query for each datasource in a serial fashion. If the Union data source implementation is refactored to implement MultiDataSource, the broker can identify segments to be retrieved from all the requested datasources at the same time therefore the queries to the historical will now include segment specs from all the datasources requested in the union query. This can eliminate the performance bottleneck associated with the serial execution.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
jihoonson commented on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-656473700


   > Apart from the UnionDataSource redesign, one other useful MultiDataSource implementation would be something like a Priority list datasource that accepts an ordered list of tables.
   For example if the tables are `table1` and `table2`, the priority list datasource would attempt to serve every query out of `table1` and would use `table2` only for intervals with missing segments on `table1`.
   
   Could you explain more details? It would help others understand the design choice of `MultiDataSource`. It seems like this datasource will involve multiple roundtrips in a single query. How do the broker and historicals interact each other? Also, how do you tell some segments are missing in `table1`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
jihoonson commented on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-792072618


   @a2l007 thanks for the detailed explanation with an example. It sounds reasonable to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 edited a comment on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
a2l007 edited a comment on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-622376887


   @clintropolis Thanks for reviewing this.
   In order for the broker to support this, `CachingClusteredClient` will have to start handling a map of Datasource -> Timeline instead of just a Timeline object. Apart from this, the only other significant change would be something like this in the `computeSegmentsToQuery` method:
   
      ```
   private Set<SegmentServerSelector> computeSegmentsToQuery(Map<String,TimelineLookup<String, ServerSelector>> timeline)
       {
         final List<TimelineObjectHolder<String, ServerSelector>> serversLookup;
         if (query.getDataSource() instanceof MultiDataSource) {
           List<TimelineObjectHolder<String, ServerSelector>> multiServersLookup = ((MultiDataSource) query.getDataSource()).getSegmentsFromIntervals(intervals, timeline);
           serversLookup = toolChest.filterSegments(query, multiServersLookup);
         } else {
   // I wouldnt be using toString() as below in the actual PR. This is just to explain my thoughts :)
   
           serversLookup = toolChest.filterSegments(
               query,
               intervals.stream().flatMap(i -> timeline.get(query.getDataSource().toString()).lookup(i).stream()).collect(Collectors.toList())
           );
         }
   ```
   The `computeUncoveredIntervals` method would be modified accordingly as well.
   
   As you said, it is pretty straightforward for Union queries as it only has to deal with table datasources. Its implementation would be something like:
   
   ```
   List<TimelineObjectHolder<String, T>> getSegmentsFromIntervals(
         List<Interval> intervals,
         Map<String, ? extends TimelineLookup<String, T>> timelineMap
     )
     {
       return intervals.stream()
                       .flatMap(i -> getDataSources().stream().filter(d ->timelineMap.containsKey(d.getName()))
                                                     .flatMap(n -> timelineMap.get(n.getName()).lookup(i).stream()))
                       .collect(Collectors.toList());
   ```
   
   > Based on ^, I guess not, but If not, how does the broker distinguish which results belong to which timeline? Or does it matter? I'm having trouble imagining how this works in practice without making caching clustered client know a lot more about the query than it currently does, but maybe I haven't though hard enough about this yet.
   
   For MultiDatasource implementations, the historicals would still return intervals based sequences as it does now, but the only difference would be the sequences would have results from multiple datasources for the same interval. Do you foresee any issue with this impacting the broker merge process for Union queries in some way?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-621611255


   Is the idea that MultiDataSource would be something that exists and runs on the Broker, or on Historicals, or both?
   
   This seems sort of similar to the `SegmentWrangler` interface, which is used by the Broker in LocalQuerySegmentWalker to do direct queries over inline and lookup datasources. I wonder if these can be combined? What do you think?
   
   If not, then I wonder if this thing you're trying to do could be done in a style similar to SegmentWrangler? (Check out its SegmentWranglerModule, etc.)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 edited a comment on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
a2l007 edited a comment on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-656789106


   @jihoonson Thanks for taking a look.
   
   `MultiDataSource` is designed as such to provide its implementations a capability to decide the datasources from which segments need to be queried for an interval. This applies only to queries with multiple base tables in the `dataSource` part of the query. (Only UnionDataSource at this point).  By missing segments, I mean the case when there are no segments in `table1` for a specific interval. This is not to be confused with missing segments which are handled by the `RetryQueryRunner`.
   Expanding further on this example, lets say I have a union-based query against `table1` and `table2`, but I need data from `table2` only if there are no segments in `table1` for the interval. Currently this isn't possible using `UnionDataSource` . With `MultiDataSource`, I can create an implementation that satisfies this usecase.
   
   In order to support this, Broker will now identify segments to be queried for all the table datasources from the query in a single pass instead of one table datasource at a time. This makes it easier to push the segment selection logic to the `MultiDataSource` implementation for such queries. This diff would give a clearer picture: https://github.com/apache/druid/pull/10030/files#diff-14e0f52ca2d35d282c1e92d1c14eb0d1R365-R397
   Regarding broker-historical interaction, the queries from the broker to the historical will now include SegmentDescriptors from all the datasources requested in the union-based query.  Thus this would only require one roundtrip between broker and historical (as long as there are no missing segments reported by `ReportTimelineMissingSegmentQueryRunner`).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 commented on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
a2l007 commented on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-622111329


   @gianm  Thanks for taking a look. 
   
   Both the Broker and Historical would need to be aware of `MultiDataSource`, as Brokers would need this info to choose the custom segment selection strategy and Historicals would need to identify interval-specific timeline entries from multiple timelines. I’m not sure if I can use the `SegmentWrangler` in its current form since `MultiDataSource` is directed towards table datasources and table datasources would have to use `CachingClusteredClient`. 
   
   A `SegmentWrangler` implementation could be created with a custom `QuerySegmentWalker` but the walker might end up being much similar to `CachingClusteredClient`, especially the `getQueryRunnerForIntervals` method. 
   
   > Are there use cases you have in mind other than redesigning UnionDataSource?
   
   Apart from the UnionDataSource redesign, one other useful MultiDataSource implementation would be something like a Priority list datasource that accepts an ordered list of tables. 
   For example if the tables are `table1` and `table2`, the priority list datasource would attempt to serve every query out of `table1` and would use `table2` only for intervals with missing segments on `table1`.
   One of our usecases internally for priority list datasource is a validation mechanism for data correction on production tables. It isn’t ideal to test out the data fixes directly by reindexing the prod table,  so it's better to have an experimental table where the validations could be performed before applying it to the prod table. 
   A Priority list based datasource implementation could be used for this usecase with the table list having the experimental table first followed by the prod table.
   Another potential usecase for priority list datasource could be used to represent updates to existing data when combined with realtime ingestion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
clintropolis commented on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-622247628


   Ah, sorry, long day, and I think I confused myself by not thinking too hard about this, or alternatively by thing way too hard about this; I guess this wouldn't really cause any issue for union datasource since it issues a homogenous query across table datasources? I'm going to take a break and then i'll read through this again 😅 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
clintropolis commented on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-622243481


   >The UnionDataSource can be refactored to be an implementation of this interface and the getSegmentsFromIntervals method would return all the segments from all the datasources defined in the union query.
   
   How does this work? I guess the broker needs to keep track of which segments are for which timeline and which sub part of the union query? Does it potentially issue multiple queries to a single server instead of the current single query per historical/peon with specific segment spec that it does now? If not, how does the broker distinguish which results belong to which timeline? I'm having trouble imagining how this works in practice without making caching clustered client know a lot more about the query than it currently does, but maybe I haven't though hard enough about this yet.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis edited a comment on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
clintropolis edited a comment on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-622243481


   >The UnionDataSource can be refactored to be an implementation of this interface and the getSegmentsFromIntervals method would return all the segments from all the datasources defined in the union query.
   
   How does this work? I guess the broker needs to keep track of which segments are for which timeline and which sub part of the union query? Does it potentially issue multiple queries to a single server instead of the current single query per historical/peon with specific segment spec that it does now? 
   >the broker can identify segments to be retrieved from all the requested datasources at the same time therefore the queries to the historical will now include segment specs from all the datasources requested in the union query
   
   Based on ^, I guess not, but If not, how does the broker distinguish which results belong to which timeline? I'm having trouble imagining how this works in practice without making caching clustered client know a lot more about the query than it currently does, but maybe I haven't though hard enough about this yet.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm edited a comment on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
gianm edited a comment on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-621611255


   Is the idea that MultiDataSource would be something that exists and runs on the Broker, or on Historicals, or both?
   
   This seems sort of similar to the `SegmentWrangler` interface, which is used by the Broker in LocalQuerySegmentWalker to do direct queries over inline and lookup datasources. I wonder if these can be combined? What do you think?
   
   If not, then I wonder if this thing you're trying to do could be done in a style similar to SegmentWrangler? (Check out its SegmentWranglerModule, etc.)
   
   Are there use cases you have in mind other than redesigning UnionDataSource?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 commented on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
a2l007 commented on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-622376887


   @clintropolis Thanks for reviewing this.
   In order for the broker to support this, `CachingClusteredClient` will have to start handling a map of Datasource -> Timeline instead of just a Timeline object. Apart from this, the only other significant change would be something like this in the `computeSegmentsToQuery` method:
   
      ```
   private Set<SegmentServerSelector> computeSegmentsToQuery(Map<String,TimelineLookup<String, ServerSelector>> timeline)
       {
         final List<TimelineObjectHolder<String, ServerSelector>> serversLookup;
         if (query.getDataSource() instanceof MultiDataSource) {
           List<TimelineObjectHolder<String, ServerSelector>> multiServersLookup = ((MultiDataSource) query.getDataSource()).getSegmentsFromIntervals(intervals, timeline);
           serversLookup = toolChest.filterSegments(query, multiServersLookup);
         } else {
   // I wouldnt be using toString() as below in the actual PR. This is just to explain my thoughts :)
   
           serversLookup = toolChest.filterSegments(
               query,
               intervals.stream().flatMap(i -> timeline.get(query.getDataSource().toString()).lookup(i).stream()).collect(Collectors.toList())
           );
         }
   ```
   The `computeUncoveredIntervals` method would be modified accordingly as well.
   
   As you said, it is pretty straightforward for Union queries as it only has to deal with table datasources. Its implementation would be something like:
   
   ```
   getSegments(
         List<Interval> intervals,
         Map<String, ? extends TimelineLookup<String, T>> timelineMap
     )
     {
       return intervals.stream()
                       .flatMap(i -> getDataSources().stream().filter(d ->timelineMap.containsKey(d.getName()))
                                                     .flatMap(n -> timelineMap.get(n.getName()).lookup(i).stream()))
                       .collect(Collectors.toList());
   ```
   
   > Based on ^, I guess not, but If not, how does the broker distinguish which results belong to which timeline? Or does it matter? I'm having trouble imagining how this works in practice without making caching clustered client know a lot more about the query than it currently does, but maybe I haven't though hard enough about this yet.
   
   For MultiDatasource implementations, the historicals would still return intervals based sequences as it does now, but the only difference would be the sequences would have results from multiple datasources for the same interval. Do you foresee any issue with this impacting the broker merge process for Union queries in some way?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 commented on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
a2l007 commented on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-656789106


   @jihoonson Thanks for taking a look.
   
   `MultiDataSource` is designed as such to provide its implementations a capability to decide the datasources from which segments need to be queried for an interval. This applies only to queries with multiple base tables in the `dataSource` part of the query. (Only UnionDataSource at this point).  By missing segments, I mean the case when there are no segments in `table1` for a specific interval. This is not to be confused with missing segments which are handled by the `RetryQueryRunner`.
   Expanding further on this example, lets say I have a union-based query against `table1` and `table2`, but I need data from `table2` only if there are no segments in `table1` for the interval. Currently this isn't possible using `UnionDataSource` . With `MultiDataSource`, I can create an implementation that satisfies this usecase.
   
   In order to support this, Broker will now identify segments to be queried for all the table datasources from the query in a single pass instead of one table datasource at a time. This makes it easier to push the segment selection logic to the `MultiDataSource` implementation for such queries. This diff would give a clearer picture: https://github.com/apache/druid/pull/10030/files#diff-14e0f52ca2d35d282c1e92d1c14eb0d1R374-R395
   Regarding broker-historical interaction, the queries from the broker to the historical will now include SegmentDescriptors from all the datasources requested in the union-based query.  Thus this would only require one roundtrip between broker and historical (as long as there are no missing segments reported by `ReportTimelineMissingSegmentQueryRunner`).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis edited a comment on issue #9791: Introduce extensibility support for Datasources

Posted by GitBox <gi...@apache.org>.
clintropolis edited a comment on issue #9791:
URL: https://github.com/apache/druid/issues/9791#issuecomment-622243481


   >The UnionDataSource can be refactored to be an implementation of this interface and the getSegmentsFromIntervals method would return all the segments from all the datasources defined in the union query.
   
   How does this work? I guess the broker needs to keep track of which segments are for which timeline and which sub part of the union query? Does it potentially issue multiple queries to a single server instead of the current single query per historical/peon with specific segment spec that it does now? 
   >the broker can identify segments to be retrieved from all the requested datasources at the same time therefore the queries to the historical will now include segment specs from all the datasources requested in the union query
   
   Based on ^, I guess not, but If not, how does the broker distinguish which results belong to which timeline? Or does it matter? I'm having trouble imagining how this works in practice without making caching clustered client know a lot more about the query than it currently does, but maybe I haven't though hard enough about this yet.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org