You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/02/15 18:11:45 UTC

[GitHub] [druid] gianm opened a new issue #12262: Multi-stage distributed queries

gianm opened a new issue #12262:
URL: https://github.com/apache/druid/issues/12262


   This proposal is about extending Druid to support multi-stage distributed queries, something that I think will be really exciting and amp up what Druid is capable of.
   
   ## Motivation
   
   Today Druid's distributed query stack is single-stage: the Broker receives a query, slices it up into pieces, and sends one piece to each Historical. The Historicals produce partial results for their piece of the query, send them back to the Broker, and the Broker merges them into the final result set.
   
   This venerable design works well when the bulk of processing can be done at the leaf servers that store the actual data. It scales well: Druid can handle thousands of QPS of queries, very CPU and memory efficiently, so long as there are enough Historicals running and so long as the amount of data that makes it to the Broker is relatively small.
   
   Druid has a lot of functionality that is designed to make this single-stage distributed query stack as useful as possible. Whenever possible, we push down projections, filters, aggregations, limits, and joins. We also have a suite of builtin approximations, like approximate topN and various sketches, that people can use to minimize the amount of state that must be tracked beyond the leaf servers.
   
   But there are some cases where this query stack doesn't work well:
   
   1. Queries with very large result sets. Imagine a GROUP BY query with no LIMIT that would return billions of rows, because it groups on something with very high cardinality. Today these result sets must flow through the Broker, which creates a bottleneck.
   2. Additional SQL support. We'd like to keep expanding Druid's ability to support SQL until we can do it all. Some operations that'd we like to support, like joining together two distributed tables, cannot be done in the single-stage design.
   3. Complex query structures. Today, a query with lots of JOINs and subqueries will execute certain queries in a fully distributed manner -- basically, anything that can be represented as a base distributed table joined with broadcasted tables -- and then finish the remainder of query processing on the Broker. In some cases this works OK, because most of the work can be done as part of the distributed queries. But in other cases it results in a lot of data being collected on the Broker, which leads to an error like "Subquery generated results beyond maximum".
   4. Ingestion. Some databases use their query stack to handle ingestion. It makes sense if you think about an ingestion as a query that writes its results to a table. It would be nice if we could do this too, so we wouldn't need to maintain an ingestion execution stack that is separate from the query execution stack.
   
   ## Design
   
   There are a lot of interesting pieces here, so I just want to touch on each one in this main proposal. Each one should then be fleshed out separately.
   
   ### Query representation and SQL planning
   
   **Background**
   
   Druid models native queries as "fat" query types that represent a full distributed query. There are four query types used by the SQL engine: scan, timeseries, topN, and groupBy. Each query type represents an entire single-stage distributed computation: there is a first piece that can run distributed, and a second piece that must run on the Broker. They all internally handle filtering, projection, sorting, limiting, etc. They all have a "dataSource" field that describes where they should get their data from, which can be "table", representing an actual Druid datasource; "query", representing a subquery, and which can be any other query type; "join", representing two other datasources joined together; or a handful of other less-common datasource types.
   
   The SQL planner's main job is to morph a tree of relational operators either a single native query or a tree of native queries. In the latter case, it uses "query" or "join" datasources to link the native queries together. It has limited flexibility in how it does this, because native queries have a rigid computational structure: they always do broadcast join first, then projection, then filter, then aggregation, then sort.
   
   **Proposal**
   
   To support ever-more complex query structures, we will need a way of representing data flow that is more flexible than native queries. The tried and true DAG approach will work well here. So I propose that the multi-stage engine should model queries as a DAG of "stages". Each stage:
   
   - Runs distributed across as many servers as makes sense.
   - May consume partitioned data from some set of input stages. There will always be some stages with no input stages; these stages would be reading from Druid segments, external data, or something like that.
   - Produces data either aligned with its input partitions, or reshuffled in some way.
   
   The stages would be finer-grained than native queries. For example, a stage might do scan-filter-project, or aggregate, or sort, or limit. This is a common approach in relational databases.
   
   As a first step, I suggest we keep native queries in the picture by translating SQL -> native query -> DAG-based query. This can be done with minimal changes to the SQL planner, and would enable classic native queries to exist at the same time as DAG-based queries without much code duplication. But at some point, we'll want to go directly from SQL -> DAG-based query, since that will give the SQL planner more flexibility to reorder operations.
   
   Here's an example of how a query would look. This SQL query:
   
   ```
   SELECT
     session,
     COUNT(*) AS cnt
   FROM tbl
   WHERE browser = 'Firefox'
   GROUP BY session
   ORDER BY cnt DESC
   LIMIT 10
   ```
   
   Would have five stages:
   
   1. Scan: Read `tbl`, apply the filter `browser = 'Firefox'`, and project out `session`. No shuffle.
   2. Aggregate I: Locally group by `session` and compute `count(*)` for each. Shuffle by `session`.
   3. Aggregate II: Continue grouping by `session` within each partition, and sum the partial counts to get full counts. No shuffle. This produces a fully grouped resultset, still partitioned by `session`.
   4. Sort: Locally order by `cnt DESC`. Shuffle everything into a single partition.
   5. Limit: Take the first 10 rows from the single partition generated by the prior stage.
   
   When stages connect without shuffling, we can pipeline execution locally on the same servers, so there is no need for buffering or cross-server traffic. When stages connect _with_ shuffling, we'll need to exchange data across servers. Depending on the needs of the query, the producing stage could stream to the consuming stage, or the producing stage could buffer up all results before the consuming stage starts.
   
   For the query above, if we know (or are willing to bet) that there are not too many distinct `session` values then we can run it just as efficiently as the current single-stage approach. First, we'll make sure that Scan and Aggregate I are scheduled on the same set of workers, so the output of Scan can be pipelined into Aggregate I locally in memory. Then, we'll configure the shuffle in Aggregate I to shuffle everything down to a single partition. Next, we'll make sure that Aggregate II, Sort, and Limit all run on the same server. That server would be responsible for gathering all the partitioned Aggregate I outputs and preparing the final result. Finally, we'll set things up so all stages run concurrently, and so Aggregate II streams from Aggregate I. Put together, this is exactly what the Historicals and Brokers do today.
   
   ### Ingestion
   
   In #11929 there is a proposal for adding a SQL INSERT statement. It could be implemented on top of our existing batch ingestion tasks: the SQL layer could convert relational operators to indexing tasks just as well as it can convert them to native queries. But I think it would be nicer to implement it on top of a multi-stage query stack.
   
   We'd just need to do two things:
   
   1. Provide a way for the query stack to read external data. There's a natural way to do this through an "external" DataSource that maps onto RowBasedSegments. It would be similar to how we plug lookups into the query engines, except a little more complicated because we'll need to split the external datasource before sending it down to various servers.
   2. Provide a way for the query stack to generate segments. Once we have this multi-stage structure this is also natural: we can add a final stage to any query that shuffles data to match the target segment size and then generates and publishes segments.
   
   So I propose that we work towards this instead of having the SQL planner generate batch indexing tasks. It has a nice side benefit: part (1) alone means that we'd be able to query external data in a regular query too. I don't think that'll be a core use case for Druid, but it has some usefulness, like previewing what an INSERT _might_ do, or doing an ad-hoc join of Druid datasources with some external data.
   
   We'll also need to figure out what to do about streaming ingest at some point. I'm not sure what to do there but I think there are a few options that make sense. Even if the multi-stage query stack doesn't have builtin support for streaming queries, we can layer streamingness on top in the same way that we do today with indexing tasks: there is a supervisor that manages a series of tasks, each of which reads a chunk of data from Kafka and then publishes it.
   
   ### Server setup
   
   There is an obvious question: where will this query engine run? Today we have Brokers, Historicals, and indexing stuff (MiddleManager or Indexers). I don't want to add a net new service or process type in the long run, because we have a lot already. But I think in the short run we should add a new process type.
   
   I think at first the Broker should remain in charge of SQL planning, and may route certain queries to these new multi-stage query processes if they are running in that particular cluster. It could do this based on user request (like a context parameter) or based on aspects of the query (like presence of INSERT, or external data, or a "complex enough" query).
   
   However, like I said, in the long run I don't think it's a good plan to have an extra process type on top of all the ones we already have. So I think in the long run we should shoot for this process type actually being able to serve as a replacement for the Broker, Historicals, and MMs/Indexers. By that I mean that in the fullness of time, it should be able to:
   
   - Receive and plan SQL queries, like a Broker.
   - Cache segments locally, like a Historical.
   - Run simple single-stage queries in an extremely efficient way, like the Historical + Broker combo.
   - Run ingestions (INSERT statements) in a distributed and scalable manner, like MMs or Indexers.
   
   At that point, a small Druid cluster would be pretty simple: just a Coordinator and this new process type (or a few of them for scale-out). And a large cluster could still specialize. Imagine setting up a tier of these new processes that cache segments but don't perform ingestion, and another tier that _don't_ cache segments but _do_ perform ingestion. That's similar to setting up Historicals and MMs today, but more configuration-driven. It would allow simple clusters to be simple and complex clusters to be complex.
   
   ## Code changes
   
   A sketch of the code changes I think we'll need to make the above work out:
   
   1. Low-level mechanism for shuffling data between servers. This should include an efficient way to transfer large amounts of data over the network. Currently we use Smile-over-HTTP for this, which works well for relatively small amounts of data, but I think we can do better. It should also include both a pipelined implementation (producing stage streams to the consuming stage), and a buffered implementation (producing stage generates all data before consuming stage starts) for operations that cannot be pipelined.
   2. Cluster-level execution coordinator for a multi-stage query.
   3. Server-local execution coordinator for a single stage of a multi-stage query.
   4. An actual process type for the above three things to run in.
   5. Converter that generates multi-stage (DAG-based) queries from today's native query types. (This enables hooking into the existing SQL layer, since it generates native queries.)
   6. To make ingestion-through-query work: adapters from external data into the query stack, and from the query stack to the segment-generation code.
   
   At Imply we've started to prototype a multi-stage query engine with the above design. We've implemented some of the pieces: not all of them, but enough to perform some basic queries. At this point I thought it would be a good time to open up a discussion with the wider community, since as we continue working on this stuff, we'd like to integrate it into Apache Druid.
   
   ## Rationale
   
   A.K.A. the "why not" section.
   
   **Why not integrate with an existing open-source query engine?**
   
   I wouldn't want to stop anyone from integrating with another open-source query engine. Some people might prefer to deploy that way.
   
   But I think we will want this kind of functionality as something native in Druid, for two reasons. First: I think the user experience will be nicer if we don't require a dependency on another system. Second: I expect there will be opportunities for optimization that we can use if everything is built in to Druid, and that would be tough to implement with an external query stack.
   
   Besides, the work we'll need to do in order to build a multi-stage query engine will also benefit integrations with other engines.
   
   **Why not gradually add multi-stage capabilities to the existing query stack?**
   
   I think it's a question of complexity.
   
   The core query stack has three main hook points: individual segment scan, result merge on Historicals, and result merge on Broker. The implementations of the native query types, especially groupBy, have got quite complex over the years as new features have been added and needed to be mapped onto these hook points. For example: subtotals, ordering, limiting, having, are all typically done as part of "result merge on the Broker". The hook points are doing more work than originally envisioned, which makes the code more difficult to follow and extend.
   
   So I think it's time to try a different approach with the query stack, rather than adding new capabilities. As we create a DAG-based query stack, we can split queries up into stages and move more of the "fancy stuff" to the framework, which will simplify the query-specific logic.
   
   We can still share code, though. It won't be 100% new. We can share all of the StorageAdapter stuff (cursors, etc), the ColumnSelectorFactory stuff, and the single-segment processing code for groupBy and topN. This lower-level code is super optimized and works well. It's more the mid/high level stuff that IMO would benefit from a different approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1063319581


   > I'm also wondering if the core of Druid is combined into one process, is it possible to make Druid a built-in component, like an embedded database system, for other systems to integrate? I have this idea because we're using Druid to store application metrics, but for Druid's own metrics, they're emitted into other time-series database like prometheus.
   > If Druid can store its own emitted metrics by using its own storage and query capability, it can greatly simplify the operation.
   
   @FrankChen021 I think that would be great, and it is indeed a goal I have in mind with this work. It should be possible to have a pretty simple self contained single-server deployment. You'd need to worry more about backups, though, if you care about that sort of thing…


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm edited a comment on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm edited a comment on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1055125017


   > Gian, Are there any algorithms that would be made possible by this architecture? (By algorithms, I mean distributed implementations of relational operators, e.g. shuffle-sort, shuffle-join, broadcast-join, hybrid-hash-join, use of bloom filters.) If so, can you list the algorithms that you have in mind?
   
   @julianhyde I had in mind that a "partition-sort-shuffle-merge" operation would be the basic building block. The "execution coordinator" I mentioned should understand those, & for each stage should accept parameters like:
   
   - set of input stages (this is what makes it a DAG)
   - which input stages should be broadcast and which should be partitioned
   - shuffle key for output
   - whether the shuffled output needs to be sorted (if not, we can stream through the stage; if so, we need a buffer between stages)
   
   With that building block I think the most useful algorithms to implement at first would be shuffle sort (trivial given the building block), sort-based aggregation, sort-merge join, broadcast hash join, and shuffle hash join.
   
   Btw: I think a lot of these algorithms would be hybrid in some way if you consider the flow end to end. For example: by "sort-based aggregation" I just mean the distributed part. I had imagined the part that happens local to the leaf server still being hash-based, followed by a sort-based distributed step.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1059984018


   > At that point, a small Druid cluster would be pretty simple: just a Coordinator and this new process type (or a few of them for scale-out)
   
   This is a big advance for Druid that addresses the pain point of current complex conceptions and deployment. One of the reasons that many people now prefer ClickHouse as their analytic DBMS is its simplicity, they only have to download one process and start it and then they can import data and query.
   
   I'm also wondering if the core of Druid is combined into one process, is it possible to make Druid a built-in component, like an embedded database system, for other systems to integrate? I have this idea because we're using Druid to store application metrics, but for Druid's own metrics, they're emitted into other time-series database like prometheus. 
   If Druid can store its own emitted metrics by using its own storage and query capability, it can greatly simplify the operation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm edited a comment on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm edited a comment on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1055125017


   > Gian, Are there any algorithms that would be made possible by this architecture? (By algorithms, I mean distributed implementations of relational operators, e.g. shuffle-sort, shuffle-join, broadcast-join, hybrid-hash-join, use of bloom filters.) If so, can you list the algorithms that you have in mind?
   
   @julianhyde I had in mind that a "partition-sort-shuffle-merge" operation would be the basic building block. The "execution coordinator" I mentioned should understand those, & for each stage should accept parameters like:
   
   - set of input stages (this is what makes it a DAG)
   - which input stages should be broadcast and which should be partitioned
   - shuffle key for output
   - whether the shuffled output needs to be sorted (if not, we can stream through the stage; if so, we need a buffer between stages)
   
   With that building block I think the most useful algorithms to implement at first would be shuffle sort (trivial given the building block), sort-based aggregation, sort-merge join, broadcast hash join, and shuffle hash join.
   
   Btw: I think certain of these algorithms would be hybrid in some way if you consider the flow end to end. For example: by "sort-based aggregation" I just mean the distributed part. I had imagined the part that happens local to the leaf server still being hash-based, followed by a sort-based distributed step.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] julianhyde commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
julianhyde commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1054594201


   Gian, Are there any algorithms that would be made possible by this architecture? (By algorithms, I mean distributed implementations of relational operators, e.g. shuffle-sort, shuffle-join, broadcast-join, hybrid-hash-join, use of bloom filters.) If so, can you list the algorithms that you have in mind?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
samarthjain commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1054567811


   Thank you for the proposal, Gian! Building a DAG based query execution model seems like the next logical thing to do in Druid. I am excited to see progress on that front. 
   
   Below are some adhoc comments:
   
   1)  Build for resilience - as queries get more and more complex, chances of them running into failures because of network blips or bad hardware  go up. As we are building these new capabilities,  we should think about building resilience including capabilities to restart stages, exponential backoffs in case of network partitions, speculative execution of a certain percentage of tasks, etc. 
   
   2) Resource fairness - currently Druid has limited support for ensuring resource fairness outside of query laning. As queries supported by Druid get more complex, they will push boundaries on memory, disk, cpu and network. Ensuring fair resource usage to avoid queries from starving each other and affecting overall system stability will be critical, IMHO. 
   
   3) Scaling and decoupling of shuffle servers - considering shuffle servers will be mostly stateless other than storing intermediate query state, to me it makes sense to have them as independent servers not serving any other Druid functionality. This would make it easier to scale them up. 
   
   4) A UI to show the query DAG and plan would be good to have. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1055125017


   > Gian, Are there any algorithms that would be made possible by this architecture? (By algorithms, I mean distributed implementations of relational operators, e.g. shuffle-sort, shuffle-join, broadcast-join, hybrid-hash-join, use of bloom filters.) If so, can you list the algorithms that you have in mind?
   
   @julianhyde I had in mind that a "partition-sort-shuffle-merge" operation would be the basic building block. The "execution coordinator" I mentioned should understand those, & for each stage should accept parameters like:
   
   - set of input stages
   - which input stages should be broadcast and which should be partitioned
   - shuffle key for output
   - whether the shuffled output needs to be sorted (if not, we can stream through the stage; if so, we need a buffer between stages)
   
   With that building block I think the most useful algorithms to implement at first would be shuffle sort (trivial given the building block), sort-based aggregation, sort-merge join, broadcast hash join, and shuffle hash join.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1055084660


   @samarthjain Thanks for your comments. I totally agree about the importance of fault tolerance and good resource management. It'll be important as queries can get more complex and longer-running.
   
   > Resource fairness
   
   For resource management, I was thinking that for really big clusters, tiering (i.e. isolation of different workloads onto different servers) is going to be important. That way you can avoid sharing caches, etc between your interactive and your noninteractive workloads. I was thinking that in a dynamic cloudy sort of environment, you'd want the servers running interactive queries to run persistently, whereas you'd want the servers running multi-stage queries to be spun up on demand.
   
   In smaller or less dynamic clusters, where you're likely to run mixed workloads on the same servers, I agree we'll need to improve the current resource management system to make that work better. I have some fuzzy ideas about this but it really deserves to be fleshed out into its own proposal. I think it'll need to involve some degree of dynamism, like observing how much resource queries are actually using and adjusting their priorities appropriately.
   
   > Scaling and decoupling of shuffle servers
   
   I'd like to provide a smooth experience for both small clusters and large ones. I was thinking that for small clusters, or even single servers, you'd want the same servers doing all kinds of functionality, to keep management simple. For larger clusters I was thinking you'd want more isolation and independent scaling. I think we can make this possible with a server type that can handle a wide array of functionality, coupled with the ability to configure specific instances to only do one specific thing if you want independence.
   
   > A UI to show the query DAG and plan would be good to have.
   
   Yes!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] julianhyde commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
julianhyde commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1063523547


   @FrankChen021 When I was building the Druid adapter for Calcite I would have *loved* an embedded Druid option. It would have saved me lots of effort provisioning a VM (with enough memory for the 4 or 5 JVMs that Druid requires) in which to run the unit tests. I suspect that an embedded Druid (even - if I could dare to hope - one that is single-threaded for common tasks like preparing and executing queries) would be very helpful for Druid developers too. It's so nice to be able to start up and stop the whole server in your IDE.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] julianhyde commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
julianhyde commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1056124292


   That makes sense. I mentioned Bloom filters because they (like correlated restarts) need to travel in the opposite direction to the flow of data, and complicates the execution. It's wise to start off with something simple. Other algorithms can be added later.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1059631099


   @julianhyde Do you have a reference for what sort of algorithm you had in mind that uses such opposite-direction-flowing Bloom filters? I'd like to read more about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1063320736


   > @gianm When you are doing a big join with filters on both sides - e.g. orders from customers in california for red products - then you only want to read customers who may have bought a red product, and also only want to read orders placed by a customer in california. Thus you want a filter implied by the join to travel both ways, and you can handle a few false positives. A good way is to generate a bloom filter as you are scanning customers and pass it to the scan of products, and vice versa.
   
   Ah, that's a cool idea. I hadn't been thinking about it before, but now I will be, which is good 🙂. Thanks for bringing it up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm edited a comment on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm edited a comment on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1055125017


   > Gian, Are there any algorithms that would be made possible by this architecture? (By algorithms, I mean distributed implementations of relational operators, e.g. shuffle-sort, shuffle-join, broadcast-join, hybrid-hash-join, use of bloom filters.) If so, can you list the algorithms that you have in mind?
   
   @julianhyde I had in mind that a "partition-sort-shuffle-merge" operation would be the basic building block. The "execution coordinator" I mentioned should understand those, & for each stage should accept parameters like:
   
   - set of input stages
   - which input stages should be broadcast and which should be partitioned
   - shuffle key for output
   - whether the shuffled output needs to be sorted (if not, we can stream through the stage; if so, we need a buffer between stages)
   
   With that building block I think the most useful algorithms to implement at first would be shuffle sort (trivial given the building block), sort-based aggregation, sort-merge join, broadcast hash join, and shuffle hash join.
   
   Btw: I think a lot of these algorithms would be hybrid in some way if you consider the flow end to end. For example: by "sort-based aggregation" I just mean the distributed part. I had imagined the part that happens local to the leaf server still being hash-based, followed by a sort-based distributed step.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm edited a comment on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
gianm edited a comment on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1055125017


   > Gian, Are there any algorithms that would be made possible by this architecture? (By algorithms, I mean distributed implementations of relational operators, e.g. shuffle-sort, shuffle-join, broadcast-join, hybrid-hash-join, use of bloom filters.) If so, can you list the algorithms that you have in mind?
   
   @julianhyde I had in mind that a "partition-sort-shuffle-merge" operation would be the basic building block. The "execution coordinator" I mentioned should understand those, & for each stage should accept parameters like:
   
   - set of input stages (this is what makes it a DAG)
   - which input stages should be broadcast and which should be partitioned
   - shuffle key for output
   - whether the shuffled output needs to be sorted (if not, we can stream through the stage; if so, we need a buffer between stages)
   
   With that building block I think the most useful algorithms to implement at first would be shuffle sort (trivial given the building block), sort-based aggregation, sort-merge join, broadcast hash join, and shuffle hash join.
   
   Btw: I think certain of these algorithms would be hybrid in some way if you consider the flow end to end. For example: by "sort-based aggregation" I just mean the distributed part. I had imagined the part that happens local to the leaf server being hash-based, followed by a sort-based distributed step.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] julianhyde commented on issue #12262: Multi-stage distributed queries

Posted by GitBox <gi...@apache.org>.
julianhyde commented on issue #12262:
URL: https://github.com/apache/druid/issues/12262#issuecomment-1061079561


   @gianm When you are doing a big join with filters on both sides - e.g. orders from customers in california for red products - then you only want to read customers who may have bought a red product, and also only want to read orders placed by a customer in california. Thus you want a filter implied by the join to travel both ways, and you can handle a few false positives. A good way is to generate a bloom filter as you are scanning customers and pass it to the scan of products, and vice versa.  
   
   In short, approximate semi-joins pushed down to each sides of a join as pre-filters.
   
   The paper "[Sideways Information Passing for
   Push-Style Query Processing](https://repository.upenn.edu/cgi/viewcontent.cgi?article=1045&context=db_research)" from 2008 describes the idea pretty well. But we were doing it in Broadbase in around 1998.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org