You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/11/16 21:39:33 UTC

[GitHub] [druid] paul-rogers opened a new issue #11933: Discussion: operator structure for Druid queries?

paul-rogers opened a new issue #11933:
URL: https://github.com/apache/druid/issues/11933


   This issue seeks to spur discussion around an idea to make the Druid code a bit easier and faster: adopt the standard operator structure for the historical nodes (at least).
   
   Currently, Druid's query engine is based on a unique structure built around the `QueryRunner`, `Sequence`, `Yielder` and related concepts. It uses a clever functional-programming based approach. However, it turns out that the resulting code is far more complex than that of similar tools that use the well-known operator structure.
   
   This is not (yet) a proposal. Instead, it is meant to start the conversation.
   
   ### Background
   
   Most query engines use a structure derived from the earliest [System R](https://en.wikipedia.org/wiki/IBM_System_R) architecture and clarified by the well-known [Volcano paper](https://www.seas.upenn.edu/~zives/03s/cis650/P209.PDF). The idea is that a query is a sequence of operators, each of which reads from its input, transforms the data, and makes that data available to the output.
   
   When used in a query with joins, the result is a DAG of operators. When used in a query without joins, or in an ETL pipeline, the result is a simple pipeline of operators. For example:
   
   ```text
   Scan --> Merge --> Sort --> Limit --> Consumer
   ```
   
   The Volcano paper suggested a specific way to implement the above: each operator is, essentially an iterator: it provides a `next()` call which returns the next record (or, in many systems, the next batch of records.) When implemented in the Volcano style, the code is relatively simple, easy to debug and easy to instrument.
   
   The solution also works for parallel execution. For example, the [Apache Storm](https://storm.apache.org/) project runs each operator concurrently. [Apache Drill](https://drill.apache.org/) and [Apache Impala](https://impala.apache.org/) parallelize "slices" (or "fragments") of a query (where a slice/fragment is a collection of operators.) Presto/[Trino](https://trino.io/) seems to try to get closer to the Storm ability to run (some?) operators concurrently.
   
   Operators are typically stateful. An aggregation maintains its running totals, a limit maintains a count of the records returned thus far, and an ordered merge maintains information about the current row from each input. Because operators are stateful, they can also maintain instrumentation, such as row/batch counts, time spent, and so on. Many products display the resulting metrics as a [query profile](https://drill.apache.org/docs/query-profiles/) ([proposal for Druid](https://github.com/apache/druid/issues/11800)).
   
   Operators are quite easy to test: they are completely ignorant of their inputs and outputs, allowing simple unit tests outside of the entire query system. Quick and easy testing encourages innovation: it becomes low-risk to invent a new operator, try to improve an existing one, and so on.
   
   In typical SQL systems, the query planner creates the operators (or, more typically, a description of the operator.) The Calcite planner does that in Druid: it creates an execution plan with "logical" operators (that is, descriptions of operators.) Here, however, we'll focus on Druid's native queries which use a different approach to planning.
   
   Most system provide joins as a fundamental operation. In this case, the structure becomes a tree with each join having two inputs. The operator structure has proven to work quite well in such structures.
   
   ### Druid's Query Stack
   
   Druid adopts a rather unique approach to a query pipeline. Rather than dividing work into operators, Druid uses a functional programming approach in which the code tries to be stateless. A `QueryRunner` runs (actually, plans) part of the query, returning a `Sequence` that provides the results. The `Sequence` does not actually return the results, it instead provides an aggregation interface to return aggregated results. A `Yielder` builds on this approach to return individual "events" (Druid's term for a row or batch). Druid makes extensive use of anonymous inner classes, lambdas and closures as a "back-door" to provide the state necessary to perform most operations.
   
   The result is far more complex than other, similar query implementations. Quite a bit of time is spent navigating the large number of code layers that result.
   
   All that said, the code clearly works quite well and has stood the test of time. The question is, can we do even better?
   
   ### The Question: Operators for Druid?
   
   This issue asks the question: could Druid be faster, easier to maintain and more extensible if we were to adopt the classic operator approach?
   
   Part of any such discussion is the cost: does this mean a complete rewrite of Druid? What is being proposed is not a rewrite: it is simply a refactoring of what already exists: keep the essence of each operator, but host it in an operator rather than in layers of sequences, yielders, etc.
   
   ### Summary of Possible Approach
   
   A [prototype](https://github.com/paul-rogers/druid/tree/pipe) has been completed of the Scan query stack. Scan query was chosen because it is the simplest of Druid's native query types. The general approach is this:
   
   The `QueryRunner` implementations already perform planning and return a sequence with the results. The prototype splits this functionality: the planner part is moved into a planner abstraction which creates a required set of operators. The operators themselves perform the run-time aspect currently done by the `Sequence` and `Yielder` classes.
   
   The above approach reflects the way in which Drill does "just-in-time planning": native queries are planned during the `QueryRunner.run()` call: we simply move this logic into an explicit "just-in-time" planner. That planner also does what the `QuerySegmentWalker` does to plan the overall structure of the query.
   
   As it turns out, most `QueryRunner`s don't actually do anything with data: they instead do planning steps. By moving this logic into the planner, the required set of operators becomes far smaller than the current set of `QueryRunner`s.
   
   With planning done in the planner, the operators become very simple. For the scan, we have operators such as the segment scan, merge, and limit. Developers typically try to optimize the data path to eliminate all unnecessary code. With the operator approach, the only code in the data path is that which obtains or transforms events: all "overhead" is abstracted away.
   
   ### Example
   
   The easiest way to summarize the potential benefits of the operator approach is with an example. Here is the call stack for a scan query with the debugger stopped in the function which retrieves data from segments:
   
   <img src="https://user-images.githubusercontent.com/3248881/141379552-8fcb734c-2f84-416d-886e-676196bb0baf.png" width=400 />
   
   Here is the exact same query using the [code refactored to use operators](https://github.com/paul-rogers/druid/tree/pipe):
   
   <img src="https://user-images.githubusercontent.com/3248881/141379609-9f4b5f91-c128-4e81-8e7e-fc35e00ceb9b.png" width=400 />
   
   Full disclosure: the scan query is, by far, the simplest native query type in Druid. More work is needed to try the same approach in the more heavy-duty query types such as top-n or group-by.
   
   ### Possible Implementation Path
   
   Changing Druid's query execution path is a bit like brain surgery: Druid is used in production by a large number of people: we can't risk breaking they query path. Hence, we need a low-risk approach if we wanted to explore this option.
   
   The safest option is to implement the operator approach in parallel with the current approach: enable operators via a feature flag. And, indeed that is how the stack traces above were created.
   
   Conversion of a native query can be incremental as well. Wrappers allow an operator to act like a `Sequence` and visa-versa, allowing us to migrate one piece of query execution at a time.
   
   The development approach could thus be to tackle one native query at a time. Within each, convert one operation at a time. At each step, enable the operator version via a feature flag, defaulting to the operator version only after sufficient testing.
   
   ### Potential Benefits
   
   We've already mentioned some benefits: simpler code, a tighter data execution path, easier debugging, and easier testing. Other possible benefits include:
   
   * Easier to add new operations: just insert an operator, other operators are completely agnostic.
   * Easier to add new data sources. Want to query Parquet or CSV along with Druid? Just add a reader operator. (This is what makes Drill, Impala and Trino so flexible.)
   * Easier to optimize. Operators provide simple, guaranteed places to obtain and release resources, allowing easier experimentation with direct memory, alternative data layouts and so on.
   * Easier to explain: if the query profile is a direct representation of the operator tree (rather than a fiction, as in the current proposal), then users will have a much better understanding of how Druid executes their queries, which allows users to look for more efficient alternatives to gain additional performance. (The old adage: if you can't measure it, you can't improve it.)
   
   ### Summary
   
   The operator approach has many benefits. A [prototype](https://github.com/paul-rogers/druid/tree/pipe) shows that the approach works well for the native scan query. The question that this issue raises is this: is this something that would provide sufficient benefit to the Druid community that we'd want to move to a concrete proposal?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-972087862


   ### Answers
   
   With that background, let's answer the questions you raised.
   
   *Different implementation*: The prototype is a refactoring of the existing scan query code, as described above. Since the planner works differently than how the toolchest, etc. works, it was not possible to directly use those in the prototype. Instead, the prototype uses those parts which work as-is, and made a refactored copy of those bits that need to change to work in the context of the new planner. Basically, those parts, such as `mergeResults()`, which build `QueryRunner`s are mapped into a different, but parallel, implementation in the prototype operator planner. If we were to proceed, that code should move to the toolchest, since the toolchest represents the query-type behavior. In other cases, since this is a refactoring, I kept whatever I could, even if that meant the prototype is somewhat more scattered than a final implementation would be.
   
   *Operator vs. Sequence*: Clearly, the `Sequence` idea was meant to represent the result set, without the caller actually having to work with the rows in the result set. The accumulate method would handle any per-row operations. Hence, `Sequence` reifies (represents as an object) the result set, but not the operations on that result set.
   
   By contrast, the `Operator` model reifies (represents as an object) operations, while leaving the result set as a non-reified abstraction. This is important when working with billions of rows: one can never hope to hold the whole result set, one is just building a pipeline that the rows flow through. Hence, the focus on the "pipe fittings", not the water. (Storm made this idea very explicit back in the day.)
   
   Which is "right"? Neither. The question is, which is fastest and simplest. Hence this discussion.
   
   *Semantic difference*: To see the difference in the two approaches, one has to pop up from the code to see the design. We've discussed the design above. The proposal separates planning and execution. It also combines a pile of sequences, yielders, lambdas, closures and other knick-knacks into a simple, clean operator structure. Why? For the reasons above: simpler to understand, easier to test, easier to debug, easier to reuse.
   
   Said another way, both the current design and the prototype run a scan query. In that sense they are the "same." The proposal suggests how we can do the same work with far less code and complexity by leveraging the idea of accepting that a query is a pipeline of stateful operators.
   
   The current code tries to follow the functional programming model to be stateless. But, since queries are, in fact, stateful, the code becomes very complex: state is often added via the "back door" of closures. The operator model accepts that queries have state and manages that state cleanly.
   
   *Stack trace is so much cleaner*: you are on the right track. The reason the stack trace is simpler is that, at runtime, we do only the required work and no overhead, no unnecessary abstractions. A limit operator counts its rows and says its done when its done. That's all. That can be implemented right in the operator. Since operators are very light weight, no reason to have multiple layers of abstraction. The stack trace is smaller because operator are the KISS solution for queries.
   
   Yes, if someone were to implement operators by adding multiple layers of abstraction, then it would be ugly. The point is, those layers are not necessary. Every operator has one very simple task: get some input, do a transform, produce an output. If you want to do two different things, create two operators. We can do this because the planner will choose wisely about which operator to include: we don't include operators we don't need. Operators don't have to be two things. The result is a very simple runtime stack. And, as we've all learned, the name of the game in performance is to minimize the amount of code in the per-row path.
   
   *WrappingSequence, etc.*: You point out the many uses of wrapping sequences. Yes, those are a pain. Many seem to want to do something on sequence start or end, but end up having to insert themselves into the per-batch stream to do that. (We're really in the weeds now.)
   
   To address this, the `Operator` abstraction added a trick, just for Druid. In most implementations, the open method is `void open()`. For Druid, it is `Iterator open()`: the call returns the iterator to use to fetch rows. This allowed the prototype to emulate the “open/close-only” operators by returning their child on `open()`, making the per-row call stack simpler.
   
   Could we do the same with the current code? Sure, its *soft*ware after all. Would doing so achieve the other objective? That’s the discussion we’re having.
   
   *QueryToolChest and QueryRunnerFactories are extension points*: Absolutely. This is modeled, in the prototype, by the `QueryTypePlanner` interface in the “stub” historical planner. As noted above, any code in the `QueryTypePlanner` class would, if we were to proceed, migrate to the classes you mentioned.
   
   *Swapping out the underlying implementation in-place creates a relatively high burden*: You have hit on the major discussion point. The reason that the prototype is a refactoring of the scan query path is to minimize the risk of breaking anything. Of course, the scan query is far simpler than the other types. We'd want to investigate if those others lend them selves to the same ease of refactoring as the scan query.
   
   One answer is testing. Let's assume we have good tests for each query type. Then, we simply rerun those tests to ensure things work. This worked quite well when we made these kinds of changes in Drill and Impala to "modernize" some bit of code or other.
   
   The first step of the refactoring here replaced just the segment reader. Existing unit tests passed just fine. Then, as each `QueryRunner` was refactored to create an operator, a new unit test was created for that operator. (Remember, operators are easy to test one-by-one.) I've not yet figured out how to run integration-level tests with the new code, but that would be the next step.
   
   To ensure safety, if we were to proceed, I'd suggest working step-by-step, in whatever sequence makes sense. First rely on unit tests. Then run integration tests.
   
   Further, for a while, provide an option to choose the "classic" or "operator" path. That is possible because the refactoring will move code to new classes, leaving the original ones unchanged.
   
   Still, risk reduction is a key question: the thought is to take on some risk now to reduce risk later. (That is, we get simpler code, perhaps faster execution, easier extensions, easier testing later, by taking on a bit of risk today.) This is a what we should discuss.
   
   *The proposal is just a stylistic adjustment to the current code without any new or different semantic constructs being added*: Good point. The idea is first to align ourselves with best practices: simple code, easy to test, etc. Then, we can add actual functionality (read Parquet files, materialized views, multi-level DAGs, integrate Druid with other query engines.) If we were to propose moving to operators AND doing any of the above, the proposal would be even larger!
   
   There is more than style to this proposal (see above.) This is a different functional way to achieve the goal. In the same way that Druid is not a stylistic difference from Oracle, Teradata, Hadoop or Hive: it is functional difference, and that is what makes Druid successful. In the same way that segments are not just a different style for storing rows (they are a faster way), operators are a faster way to run a query than the current implementation. Both work, operators just have proven to be simpler and faster in other tools.
   
   So, another good topic for discussion is: where is Druid going? If we want extensions, let's make extensions super simple. If we want more query types, let's make that super simple. If we want more testing, faster development, easier debugging, let's use techniques which have worked well elsewhere.
   
   *Some parts of the physical plan cannot be determined until the query reaches the segment*. Sure. That's called "schema-on-read." A filter operator (say), needs to filter one way if a given column is a string vs. an integer. Drill works that way. However, the filter operator itself can be determined at plan time. (Yes, I know filter is a poor example because of how Druid works, but let's run with it.)
   
   Another example is a merge. If we read two or more segments, we have to merge them. If we read one, no merge is needed. This kind of thing can be optimized by inserting a merge which, if it finds it has only one available input, just returns that input with no merge.
   
   In reviewing the scan code (which, as noted, is a super-simple query type), most decisions can be made in the plan step, with just a few being reactive at runt time. (Did a segment return any rows? Did it return two cursors or just one? Can we skip a segment because we've reached our limit?) These rather simple reactive decisions were just made part of the operator.
   
   *Understanding of what is different*. To summarize: focus on the goal: leverage proven techniques to gain benefits such as code simplicity, faster per-row execution, easier testing, etc. What are those proven techniques? The operator pipeline model, with the pipeline created by a planner (in Druid's case, a "just in time" planner derived from the `QueryRunner` code.) Key point: I didn't make up the operator idea, it well known. All this proposal does is apply those techniques to Druid via refactoring.
   
   To be very succinct: Druid has a query pipeline. Query pipelines have proven be simplest when they use a Volcano-style operator structure. The current code shows that functional programming techniques, while very helpful elsewhere, result in code that is more complex (and likely shower) than the operator technique when used to implement a query pipeline.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-973471291


   @cheddar kindly walked me though the current design. (He is the original author.) Updated the issue description with the result. In a nutshell, he points out that the code complexity we see to day may be the result of over-eager use of anonymous inner classes and closures, not a result of the `Sequence` abstraction. We agreed I'd try a refactoring of some handy candidate to see how this might work in practice. Stay tuned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974603915


   ### Resource Management
   
   A conversation with @cheddar revealed one of the key goals of the `Sequence` abstraction: the ability to manage resources. It works so well that he says Druid has seldom experienced a query-related resource leak. Pretty darn good!
   
   As @cheddar observed, an operator is little more than a closable iterator, with some goodies tacked on, and closeable iterators are resource leaks waiting to happen.
   
   There is more than one way to handle the problem, however. As it turned out, Drill originally did the "every operator has to ensure it is closed" thing. It was a complex mess. (We didn't have `Sequence` to help.) Then Drill let it's fragment runner do the work. The code became much simpler and resource leaks disappeared, even in nasty error situations.
   
   The operator prototype learned from Drill's experience. The operator "DAG" is managed by a fragment runner which knows of every operator in the DAG, and which ensures that, come hell or high water, the operators are closed. The result is that the data path (`hasNext()`/`next()`) is as simple as possible since it need not handle every possible error case that needs closing. Operators can release resources early if they can, but they don't have to: they're still guaranteed to be released at the end.
   
   So, operators solve the "close problem" while providing a super-simple (and efficient) data path. This is why the pattern is widely used in many projects. This is reflected in the stack traces above: the current code has extra layers, in part, to ensure that we use accumulators that can close resources. The operator stack is simpler because resource management sits on the side, discretely out of sight of the data path.
   
   We could certain use the same trick with a `Sequence`: register each one with a sequence context, and that context can ensure everything is closed. While that make the yielder code a bit simpler, it would still leave substantial complexity. Let's tackle that next.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974741126


   ### Migration Approach
   
   @cheddar asked another great question: Suppose we wanted to consider the operator  approach. How would we do it without breaking things or hurting performance?
   
   To prove the operator concept, the full prototype presents a "whole enchilada" solution: convert sequences to operators, and convert query runners into a planner. However, this is far too much work to do in one go in the production code. What steps might we consider that produce value at each step?
   
   We'd start with refactoring each query runner when convenient, such as when we need to work in that area anyway, leaving the rest unchanged. Test each step. Once all are converted, consider refactoring the query runners to create an integrated planner.
   
   [Yet another prototype](https://github.com/paul-rogers/druid/tree/op-step1) shows how this could be done.
   
   ### Refactor Query Runners
   
   At first we only focus on individual query runners and their sequences. Today, the query "DAG" looks a bit like this:
   
   ```text
   input sequence --> yielder --> ad-hoc code --> output sequence
   ```
   
   Where the query runner does its thing in the ad-hoc code, then wraps it in a sequence. The ad-hoc code is often in the form of anonymous inner classes with state passed in via closures. The result is that the code is a bit hard to understand, debug and test. So, the first step is to fix that by applying the operator pattern within each query runner:
   
   ```text
   input sequence --> yielder --> operator --> output sequence
   ```
   
   Aside on naming: these notes use the term "operator" for the bit that does the  actual row/batch transforms since the term "relational operator" is common in the literature. However, it doesn't matter what we call it: "sequence transformer", "data munger" or whatever.
   
   Compared with the ad-hoc code, an operator:
   
   * Is a named, static class that can be unit tested independent of its "host" query runner.
   * Uses indirection so that the operator does not depend on any specific input.
   
   In practice, the yielder abstraction is a bit awkward to use as an input iterator. So, add another wrapper:
   
   ```text
                                    shim       transform
   input sequence --> yielder --> operator --> operator --> output sequence
   ```
   
   Where the "shim operator" makes a yielder look like an operator, and the "transform operator" is the "guts" of what the query runner does.
   
   [A `Supplier` abstraction](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/Operators.java#L145) runs the child query runner, obtains its output sequence, and wraps that sequence in the shim operator. This decouples each operator from whatever produces the input. To the operator, it looks like it gets an operator as input, though the actual input might be a sequence. (Sorry, it gets a bit confusing as we add layers...)
   
   The new prototype uses the [operators](https://github.com/paul-rogers/druid/tree/op-step1/processing/src/main/java/org/apache/druid/query/pipeline) from the first prototype, and adds the needed [adapters](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/Operators.java). Then, rather than wholesale restructuring of the query runners into a query planner, this prototype just fits operators into the existing query runner structure. We create a [replacement query runner](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/LimitAndOffsetRunner.java) for the part of the scan query that handles limit and offset.
   
   Then, a bit of a hack is added so that the [`ScanQueryQueryToolChest`](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java#L70) can execute either the original sequence-based limit and offset, or the new operator-based one, depending on a hard-coded switch.
   
   Now that the operators are named classes with few dependencies, it is easy to [unit test](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/test/java/org/apache/druid/query/pipeline/ScanQueryOperatorsTest.java) them using a mock input source.
   
   ### Fragment Context
   
   Tthe sequence protocol to ensure resources are released. The next step is to begin to transfer that task to a "fragment context": the thing which manages the fragment of the query running on this one node. At this point, the fragment context is redundant, but we'll need it for the next step.
   
   We define a fragment context as a registry of operators. The fragment context is passed into each query runner. The new prototype does this with the help of the [`ResponseContext`](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java#L359) since that's all that's available. It ain't pretty, but it works for this prototype. [Each runner registers](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/LimitAndOffsetRunner.java#L138) the operator(s) it creates. 
   
   Some [hackery](https://github.com/paul-rogers/druid/blob/op-step1/server/src/main/java/org/apache/druid/server/QueryLifecycle.java#L290) was done to handle the fragment context, at least for native queries. (Again, not elegant, just a proof-of-concept.)
   
   ### Optimize Away Sequences
   
   At this point, our little section of the scan query "pipeline" looks a bit like this:
   
   ```text
   offset operator --> sequence --> yielder --> shim operator --> limit operator
   ```
   
   Since operators don't care where their input comes from, as long as it looks like an operator and delivers the right data, we can [optimize away](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/Operators.java#L105) the middle bits above. When we create that shim, we check if the input sequence is "backed" by an operator. If so, we return that operator instead of creating the shim, resulting in:
   
   ```text
   offset operator --> limit operator
   ```
   
   Which then gives us that compact stack shown in the original post (at least for these two operations).
   
   Logically, the sequence is still present, and a query runner that wants to use a sequence can do so. But, if adjacent query runners both create operators, the sequence just melts away, leaving a cleaner, more efficient pipeline.
   
   That fragment context we added above ensures that, despite operators being a dreaded "closable iterator", they all are guaranteed to get closed at the completion of each fragment (query running on one node).
   
   ### Query Runners Create Operators
   
   Imagine that the entire query stack is eventually modified to follow the above pattern. Each query runner creates an operator, wraps it in a sequence, which is then unwrapped by the "upstream" operator created by the upstream query runner. At this point,  we can just discard the sequences and let each query runner return an operator instead. Any query runner that wants a sequence can have one: it just adds the operator-to-sequence shim.
   
   The present prototype does not show this (not all query runners are converted: just one of them), but this gives an idea of where we could go over time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers edited a comment on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers edited a comment on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974603066


   ### Prototype Based on Sequences
   
   Per a suggestion by @cheddar, did an experiment using the `Sequence`/`Yielder` pair. The question is: if we used straightforward classes, could the code based on `Sequence` be as simple as the prototype based on the operator protocol? Is the reason for stack clutter and extra code due more to excessive use of inner classes and lambdas than to the `Sequence` abstraction itself?
   
   The result is a [limit sequence](https://github.com/paul-rogers/druid/blob/ad5147a72ee77fd8e86655ea4e03c7e64e88f9b5/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitSequence.java), and a [base transform sequence](https://github.com/paul-rogers/druid/blob/ad5147a72ee77fd8e86655ea4e03c7e64e88f9b5/processing/src/main/java/org/apache/druid/query/scan/TransformSequence.java). Didn't refactor all of the scan query, just did the scan query limit operation. 
   
   This one little experiment suggests that, in fact, it probably is the `Sequence`/`Yielder` pattern that leads to complex code. In fact, the current limit implementation might actually be about as simple as we an make it, if we are required to use the `Sequence`/`Yielder` pattern.
   
   First, let's describe the new prototype, then later comments will outline what was learned.
   
   After fiddling about, the simplest implementation was to use the previous [operator](https://github.com/paul-rogers/druid/blob/pipe/processing/src/main/java/org/apache/druid/query/pipeline/ScanResultLimitOperator.java) version as the basis for the new prototype. The reason is that the operator code is about as simple as one can make a limit operation. That code was refactored from the original [limit iterator](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java) by replacing the original  `Sequence`/`Yielder` input with an input operator.
   
   Then, to turn the prototype limit into a `Sequence`, the code added the sequence methods `accumulate()` and `toYielder()`. Of these, `toYielder()` seems to especially complex. The easiest path seemed to be to adapt the code from `BaseSequence` to implement both `accumulate()` and `toYielder()` in terms of the iteration methods from the prior code.
   
   The result is that our limit sequence is a simple iterator over batches (doing the needed counting, and truncating the last batch when needed), along with a bunch of extra overhead to implement the `Sequence`/`Yielder` protocol.
   
   In fact, it is probably simpler to split these two concepts, as in the first prototype: an operator is fancy iterator: super simple. If you want a sequence, just wrap the operator in a standard sequence wrapper class. The result would probably be simpler than the current prototype: we'd use composition rather than the inheritance used to convert the limit into a sequence.
   
   We can now see why the current Druid limit code is already close to optimal, given the `Sequence`/`Yielder` requirement. It is an iterator wrapped in a reusable sequence class that provides a standard yielder.
   
   This also suggests why Druid uses so many extra knick-knacks at runtime. One often needs an iterator-like thing that does the work, a sequence wrapper around the iterator, and a yielder/yielding accumulator wrapper around the sequence. For. Every. Transform.
   
   Bottom line: the core limit operation is simple. It becomes bogged down, however, when it has to implement the `Sequence`/`Yielder` protocol. The operator prototype shows that code is actually not needed. Druid would be simpler if we just removed it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-972029941


   @cheddar, thanks for the comments. I'm impressed that you dug deep into the code! The posted description is kind of high level because I didn't really expect folks to dive in as far as you did. So, let's go into the details. I suppose what we really need is a table that maps the existing concepts to the corresponding operator concepts. I'll add that at some point.
   
   First, let's discuss some of the prototype bits you mentioned to give context for anyone else reading along.
   
   ### Prototype Overview
   
   You did find the key abstraction: the [`Operator`](https://github.com/paul-rogers/druid/blob/pipe/processing/src/main/java/org/apache/druid/query/pipeline/Operator.java) interface. This is just an iterator with `start()` and `close()` methods. In that sense, `Operator` is like a `Sequence`/`Yielder` pair. The difference is that the code to process a batch of records is located entirely in the `Operator.next()` method, meaning that the `Operator` implementation can hold any required state as member variables.
   
   Once we have an `Operator` construct, we can pretty easily [churn out those needed by the Scan Query](https://github.com/paul-rogers/druid/tree/pipe/processing/src/main/java/org/apache/druid/query/pipeline). In this case, the list of operators was created by refactoring the existing code to move the execution bits into an operator class. Basic [unit tests](https://github.com/paul-rogers/druid/tree/pipe/processing/src/test/java/org/apache/druid/query/pipeline) exist for each operator. (One of the nice things about the operator model is that they are easy to test.)
   
   Operators are entirely run-time concepts: they just start, process batches, and close. `QueryRunner`s, as noted, actually do quite a bit of planning. So, the plan-time component moves into a planner abstraction. The planner here is a quick refactor of the scan query `QueryRunner`s into the [HistoricalQueryPlannerStub](https://github.com/paul-rogers/druid/blob/pipe/server/src/main/java/org/apache/druid/server/pipeline/HistoricalQueryPlannerStub.java). This is mostly a proof of concept, created in parallel with the operators: the run-time stuff went into operators, the plan-time stuff into the planner.
   
   The planner works roughly like the `QueryPlanner`s: it is a recursive set of functions, each of which builds its operator given its child, and returns it. A bit like the `QueryPlanner` call's its child's `run()`, then creates another `Sequence` around it. Here, however, we build the operators at plan time, and run them at run time (both of which are just two steps within the overall "run a scan query" task.)
   
   There is two more minor bits. The fragment runner, which ensures operators are closed whether the query succeeds or fails. The `FragmentContext` which is just a holder for all the stuff that an operator might need to do its work. Since the current code is a refactoring, the context just holds the query ID and the response context, which is all the current code uses. The fragment context is just a way of not having to pass in common state into every operator constructor.
   
   ### Why are we Considering This?
   
   Let's tackle the main question first: why are we bothering with this prototype? Let's answer this first since it motivates everything else. There are several reasons as outlined in the proposal.
   
   The first thing to note is that the operator concept is not a new idea: it is the standard approach in a bunch of other tools (Spark, Drill, Presto/Trino, Impala, and many others.) So, we're just trying to draft off of what others have learned.
   
   * Operators are independent of one another. An operator accepts inputs, does its thing, and returns results. It does not care what the input is. By contrast, a `QueryRunner` has to know what child to create (often via a helper such as a toolchest.)
   * Separation of planning and execution. Both occur within a single query request. In the current code, the two operations are mixed. With the operator model, a planner decides which operators are needed, then the operators go off and run the query. Classic separation of concerns.
   * Operators are stateful. Almost all query operators, in any tool, are stateful. Limits need a row count. Merges need to keep track of the next row from each input. And so on. Operators provide a simple, clean way to hold that necessary state. The current implementation has state, of course, but it is added via closures, lambdas and all manner of cleverness.
   * Operators are simple. The operator model has evolved over many years and many projects. The result is the simplest possible way to get the job done: often with very few classes, and other constructs. Simple code is faster, easier to understand and easier to test. Again, a classic observation.
   * Simpler to debug. The stack trace shows that the runtime code is simpler: the actual operations are front and center.
   * Perhaps more efficient: Smaller amounts of code and a smaller runtime stack can mean less overhead. A good next step would be to run an actual performance test to determine if there is any actual benefit. (It could be that most of the time is spent in I/O, so reducing CPU won't help.)
   * Easier to test. Each operator is independent of others. We can test a limit operator, say, by using a mock data source ("reader"): the limit has no way to know (or care) where its data comes from. This allows other query engines to have robust, detailed unit tests for each operator.
   * Reusable. Because operators are agnostic, they are reusable. Of course, the current implementation could also be more resuable, operators are inherently so because they are designed to limit dependencies.
   
   The point of this discussion is: can we get some of this operator goodness for Druid?
   
   ### Answers
   
   With that background, let's answer the questions you raised.
   
   *Different implementation*: The prototype is a refactoring of the existing scan query code, as described above. Since the planner works differently than how the toolchest, etc. works, it was not possible to directly use those in the prototype. Instead, the prototype uses those parts which work as-is, and made a refactored copy of those bits that need to change to work in the context of the new planner. Basically, those parts, such as `mergeResults()`, which build `QueryRunner`s are mapped into a different, but parallel, implementation in the prototype operator planner. If we were to proceed, that code should move to the toolchest, since the toolchest represents the query-type behavior. In other cases, since this is a refactoring, I kept whatever I could, even if that meant the prototype is somewhat more scattered than a final implementation would be.
   
   *Operator vs. Sequence*: Clearly, the `Sequence` idea was meant to represent the result set, without the caller actually having to work with the rows in the result set. The accumulate method would handle any per-row operations. Hence, `Sequence` reifies (represents as an object) the result set, but not the operations on that result set.
   
   By contrast, the `Operator` model reifies (represents as an object) operations, while leaving the result set as a non-reified abstraction. This is important when working with billions of rows: one can never hope to hold the whole result set, one is just building a pipeline that the rows flow through. Hence, the focus on the "pipe fittings", not the water. (Storm made this idea very explicit back in the day.)
   
   Which is "right"? Neither. The question is, which is fastest and simplest. Hence this discussion.
   
   *Semantic difference*: To see the difference in the two approaches, one has to pop up from the code to see the design. We've discussed the design above. The proposal separates planning and execution. It also combines a pile of sequences, yielders, lambdas, closures and other knick-knacks into a simple, clean operator structure. Why? For the reasons above: simpler to understand, easier to test, easier to debug, easier to reuse.
   
   Said another way, both the current design and the prototype run a scan query. In that sense they are the "same." The proposal suggests how we can do the same work with far less code and complexity by leveraging the idea of accepting that a query is a pipeline of stateful operators.
   
   The current code tries to follow the functional programming model to be stateless. But, since queries are, in fact, stateful, the code becomes very complex: state is often added via the "back door" of closures. The operator model accepts that queries have state and manages that state cleanly.
   
   *Stack trace is so much cleaner*: you are on the right track. The reason the stack trace is simpler is that, at runtime, we do only the required work and no overhead, no unnecessary abstractions. A limit operator counts its rows and says its done when its done. That's all. That can be implemented right in the operator. Since operators are very light weight, no reason to have multiple layers of abstraction. The stack trace is smaller because operator are the KISS solution for queries.
   
   Yes, if someone were to implement operators by adding multiple layers of abstraction, then it would be ugly. The point is, those layers are not necessary. Every operator has one very simple task: get some input, do a transform, produce an output. If you want to do two different things, create two operators. We can do this because the planner will choose wisely about which operator to include: we don't include operators we don't need. Operators don't have to be two things. The result is a very simple runtime stack. And, as we've all learned, the name of the game in performance is to minimize the amount of code in the per-row path.
   
   *WrappingSequence, etc.*: You point out the many uses of wrapping sequences. Yes, those are a pain. Many seem to want to do something on sequence start or end, but end up having to insert themselves into the per-batch stream to do that. (We're really in th weeds now.)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974603066






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers edited a comment on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers edited a comment on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974603066






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-972079123


   Let's discuss some of the prototype bits you mentioned to give context for anyone else reading along.
   
   ### Prototype Overview
   
   You found the key abstraction: the [`Operator`](https://github.com/paul-rogers/druid/blob/pipe/processing/src/main/java/org/apache/druid/query/pipeline/Operator.java) interface. This is just an iterator with `start()` and `close()` methods. In that sense, `Operator` is a bit like a `Sequence`/`Yielder` pair. The difference is that the code to process a batch of records is located entirely in the `Operator.next()` method, meaning that the `Operator` implementation can hold any required state as member variables. With the `Sequence` pattern, processing code lives outside the `Sequence`, often in a lambda. State also lives outside the `Sequence`, often in closures. The operator is a simpler way to achieve the same result.
   
   Once we have an `Operator` construct, we can pretty easily [churn out those needed by the Scan Query](https://github.com/paul-rogers/druid/tree/pipe/processing/src/main/java/org/apache/druid/query/pipeline). In this case, the list of operators was created by refactoring the existing code to move the execution bits into an operator class. Basic [unit tests](https://github.com/paul-rogers/druid/tree/pipe/processing/src/test/java/org/apache/druid/query/pipeline) exist for each operator. (One of the nice things about the operator model is that operators are easy to test.)
   
   Operators are entirely run-time concepts: they just start, process batches, and close. `QueryRunner`s, as noted, actually do quite a bit of planning. So, the plan-time component moves into a planner abstraction. The planner here is a quick refactor of the scan query `QueryRunner`s into the [HistoricalQueryPlannerStub](https://github.com/paul-rogers/druid/blob/pipe/server/src/main/java/org/apache/druid/server/pipeline/HistoricalQueryPlannerStub.java). This is mostly a proof of concept, created in parallel with the operators: the run-time stuff went into operators, the plan-time stuff into the planner.
   
   The planner works roughly like the `QueryPlanner`s: it is a recursive set of functions, each of which builds its operator given its child, and returns it. A `QueryRunner` typically makes some decisions, asks its child to run (e.g. plan), makes more decisions, and returns a `Sequence`.
   
   In the prototype, the decisions happen in the planner. The planner has functions to create the corresponding operator given the child (input) operator(s). There is basically one method in the prototype planner for each `QueryRunner` that was refactored.
   
   Note that, when we say "plan time" and "run time" in this proposal we mean in the Druid context: both occur as part of a REST message to run a query. `QueryRunner`s combine planning and running. The prototype separates them so the operators can be as simple as possible in the per-row code path.
   
   The prototype uses a "shim" to adapt the `ServerManager` to the planner/operator model. During refactoring, the shim first was a full copy of the `ServerManager` code. Then, as refactoring proceeded, the code moved into the planner, with the shim left as a rump that just invokes the planner. The planner creates the operator tree, then wraps the tree in a `Sequence` so it looks the same to the rest of the code (the `QueryHandler`, etc.)
   
   A goal of the prototype was to avoid changing any existing code. So, the "shim" just diverted query construction to the new mechanism, only for scan queries, and only when enabled. To avoid changing existing code, some bits were replicated. For example, the `QueryToolChest` is a way to provide query-specific planning. This is modeled in a separate class for now. Later, if we proceeded, such code would migrate back into the existing classes.
   
   There are two more minor bits. The fragment runner, which ensures operators are closed whether the query succeeds or fails. The `FragmentContext` which is just a holder for all the stuff that an operator might need to do its work. Since the current code is a refactoring, the context just holds the query ID and the response context, which is all the current code uses. The fragment context is just a way of not having to pass in common state into every operator constructor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers edited a comment on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers edited a comment on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974603066


   ### Prototype Based on Sequences
   
   Per a suggestion by @cheddar, did an experiment using the `Sequence`/`Yielder` pair. The question is: if we used straightforward classes, could the code based on `Sequence` be as simple as the prototype based on the operator protocol? Is the reason for stack clutter and extra code due more to excessive use of inner classes and lambdas than to the `Sequence` abstraction itself?
   
   The result is a [limit sequence](https://github.com/paul-rogers/druid/blob/ad5147a72ee77fd8e86655ea4e03c7e64e88f9b5/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitSequence.java), and a [base transform sequence](https://github.com/paul-rogers/druid/blob/ad5147a72ee77fd8e86655ea4e03c7e64e88f9b5/processing/src/main/java/org/apache/druid/query/scan/TransformSequence.java). Didn't refactor all of the scan query, just did the scan query limit operation. 
   
   This one little experiment suggests that, in fact, it probably is the `Sequence`/`Yielder` pattern that leads to complex code. In fact, the current limit implementation might actually be about as simple as we an make it, if we are required to use the `Sequence`/`Yielder` pattern.
   
   First, let's describe the new prototype, then later comments will outline what was learned.
   
   After fiddling about, the simplest implementation was to use the previous [operator](https://github.com/paul-rogers/druid/blob/pipe/processing/src/main/java/org/apache/druid/query/pipeline/ScanResultLimitOperator.java) version as the basis for the new prototype. The reason is that the operator code is about as simple as one can make a limit operation. That code was refactored from the original [limit iterator](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java) by replacing the original  `Sequence`/`Yielder` input with an input operator.
   
   Then, to turn the prototype limit into a `Sequence`, the code added the sequence methods `accumulate()` and `toYielder()`. Of these, `toYielder()` seems to especially complex. The easiest path seemed to be to adapt the code from `BaseSequence` to implement both `accumulate()` and `toYielder()` in terms of the iteration methods from the prior code.
   
   The result is that our limit sequence is a simple iterator over batches (doing the needed counting, and truncating the last batch when needed), along with a bunch of extra overhead to implement the `Sequence`/`Yielder` protocol.
   
   In fact, it is probably simpler to split these two concepts, as in the first prototype: an operator is fancy iterator: super simple. If you want a sequence, just wrap the operator in a standard sequence wrapper class. The result would probably be simpler than the current prototype: we'd use composition rather than the inheritance used to convert the limit into a sequence.
   
   We can now see why the current Druid limit code is already close to optimal, given the `Sequence`/`Yielder` requirement. It is an iterator wrapped in a reusable sequence class that provides a standard yielder.
   
   This also suggests why Druid uses so many extra knick-knacks at runtime. One often needs an iterator-like thing that does the work, a sequence wrapper around the iterator, and a yielder/yielding accumulator wrapper around the sequence. For. Every. Transform.
   
   In short, the core limit operation is simple. It becomes bogged down, however, when it has to implement the `Sequence`/`Yielder` protocol. The operator prototype shows that code is actually not needed. Druid would be simpler if we just removed it.
   
   The conclusion from this little exercise is that, while our use of inner classes and closures is a bit extravagant, the extra layers required by the `Sequence`/`Yielder` protocol (compared with the traditional operator protocol) seem to encourage this complexity to make all the extra bits and pieces fit together. If we make the protocol as simple as possible, it is likely a good portion of that extra code will melt away.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers edited a comment on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers edited a comment on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974741126


   ### Migration Approach
   
   @cheddar asked another great question: Suppose we wanted to consider the operator  approach. How would we do it without breaking things or hurting performance?
   
   To prove the operator concept, the full prototype presents a "whole enchilada" solution: convert sequences to operators, and convert query runners into a planner. However, this is far too much work to do in one go in the production code. What steps might we consider that produce value at each step?
   
   We'd start with refactoring each query runner when convenient, such as when we need to work in that area anyway, leaving the rest unchanged. Test each step. Once all are converted, consider refactoring the query runners to create an integrated planner.
   
   [Yet another prototype](https://github.com/paul-rogers/druid/tree/op-step1) shows how this could be done.
   
   ### Refactor Query Runners
   
   At first we only focus on individual query runners and their sequences. Today, the query "DAG" looks a bit like this:
   
   ```text
   input sequence --> yielder --> ad-hoc code --> output sequence
   ```
   
   Where the query runner does its thing in the ad-hoc code, then wraps it in a sequence. The ad-hoc code is often in the form of anonymous inner classes with state passed in via closures. The result is that the code is a bit hard to understand, debug and test. So, the first step is to fix that by applying the operator pattern within each query runner:
   
   ```text
   input sequence --> yielder --> operator --> output sequence
   ```
   
   Aside on naming: these notes use the term "operator" for the bit that does the  actual row/batch transforms since the term "relational operator" is common in the literature. However, it doesn't matter what we call it: "sequence transformer", "data munger" or whatever.
   
   Compared with the ad-hoc code, an operator:
   
   * Is a named, static class that can be unit tested independent of its "host" query runner.
   * Uses indirection so that the operator does not depend on any specific input.
   
   In practice, the yielder abstraction is a bit awkward to use as an input iterator. So, add another wrapper:
   
   ```text
                                    shim       transform
   input sequence --> yielder --> operator --> operator --> output sequence
   ```
   
   Where the "shim operator" makes a yielder look like an operator, and the "transform operator" is the "guts" of what the query runner does.
   
   [A `Supplier` abstraction](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/Operators.java#L145) runs the child query runner, obtains its output sequence, and wraps that sequence in the shim operator. This decouples each operator from whatever produces the input. To the operator, it looks like it gets an operator as input, though the actual input might be a sequence. (Sorry, it gets a bit confusing as we add layers...)
   
   The new prototype uses the [operators](https://github.com/paul-rogers/druid/tree/op-step1/processing/src/main/java/org/apache/druid/query/pipeline) from the first prototype, and adds the needed [adapters](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/Operators.java). Then, rather than wholesale restructuring of the query runners into a query planner, this prototype just fits operators into the existing query runner structure. We create a [replacement query runner](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/LimitAndOffsetRunner.java) for the part of the scan query that handles limit and offset.
   
   Then, a bit of a hack is added so that the [`ScanQueryQueryToolChest`](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java#L70) can execute either the original sequence-based limit and offset, or the new operator-based one, depending on a hard-coded switch.
   
   Now that the operators are named classes with few dependencies, it is easy to [unit test](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/test/java/org/apache/druid/query/pipeline/ScanQueryOperatorsTest.java) them using a mock input source.
   
   ### Fragment Context
   
   Tthe sequence protocol to ensure resources are released. The next step is to begin to transfer that task to a "fragment context": the thing which manages the fragment of the query running on this one node. At this point, the fragment context is redundant, but we'll need it for the next step.
   
   We define a fragment context as a registry of operators. The fragment context is passed into each query runner. The new prototype does this with the help of the [`ResponseContext`](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/context/ResponseContext.java#L359) since that's all that's available. It ain't pretty, but it works for this prototype. [Each runner registers](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/LimitAndOffsetRunner.java#L138) the operator(s) it creates. 
   
   Some [hackery](https://github.com/paul-rogers/druid/blob/op-step1/server/src/main/java/org/apache/druid/server/QueryLifecycle.java#L290) was done to handle the fragment context, at least for native queries. (Again, not elegant, just a proof-of-concept.)
   
   The fragment context, for queries, is a bit like Druid's [`Livecycle`](https://github.com/apache/druid/blob/master/core/src/main/java/org/apache/druid/java/util/common/lifecycle/Lifecycle.java) abstraction for services: it ensures things are shut down in the proper order regardless of query outcome.
   
   ### Optimize Away Sequences
   
   At this point, our little section of the scan query "pipeline" looks a bit like this:
   
   ```text
   offset operator --> sequence --> yielder --> shim operator --> limit operator
   ```
   
   Since operators don't care where their input comes from, as long as it looks like an operator and delivers the right data, we can [optimize away](https://github.com/paul-rogers/druid/blob/op-step1/processing/src/main/java/org/apache/druid/query/pipeline/Operators.java#L105) the middle bits above. When we create that shim, we check if the input sequence is "backed" by an operator. If so, we return that operator instead of creating the shim, resulting in:
   
   ```text
   offset operator --> limit operator
   ```
   
   Which then gives us that compact stack shown in the original post (at least for these two operations).
   
   Logically, the sequence is still present, and a query runner that wants to use a sequence can do so. But, if adjacent query runners both create operators, the sequence just melts away, leaving a cleaner, more efficient pipeline.
   
   That fragment context we added above ensures that, despite operators being a dreaded "closable iterator", they all are guaranteed to get closed at the completion of each fragment (query running on one node).
   
   ### Query Runners Create Operators
   
   Imagine that the entire query stack is eventually modified to follow the above pattern. Each query runner creates an operator, wraps it in a sequence, which is then unwrapped by the "upstream" operator created by the upstream query runner. At this point,  we can just discard the sequences and let each query runner return an operator instead. Any query runner that wants a sequence can have one: it just adds the operator-to-sequence shim.
   
   The present prototype does not show this (not all query runners are converted: just one of them), but this gives an idea of where we could go over time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-1008460229


   Finally got some time to finish up the approach which @cheddar suggested. For the scan query, each `QueryRunner` checks if it should use the operator structure. If so, it plans its work as an operator. The to/from sequence magic, and removing sequences between operators allows a seamless conversion from a chain of sequence to a chain of operators.
   
   The code uses the trick mentioned above: if an operator only does things on open or close, then it removes itself from the per-row/batch path. Example: the thread renamer and the segment reference counter.
   
   Further checks within each runner avoid creating an operator when the operator would do nothing. Example: omit the finalize results operator if the transform is the identity.
   
   The resulting "plan time" code (the part which the `QueryRunner`s do, as opposed to the part that the sequences or operators do) could now easily be refactored into a planner, as in the first cut at this approach. However as @cheddar, suggested, the present hybrid approach allows seamless mixing/matching of the "classic" sequence-based code and the shiny new operator-based code.
   
   The resulting stack, when stopped at an innermost `next` is pretty simple:
   
   ```text
   Daemon Thread [scan_wikiticker_[2015-09-12T13:00:00.000Z/2015-09-12T14:00:00.000Z]]	
   	ScanQueryOperator$Impl.next() line: 189	
   	ConcatOperator.next() line: 83	
   	ScanResultLimitOperator.groupedNext() line: 72	
   	ScanResultLimitOperator.next() line: 65	
   	CpuMetricOperator.next() line: 82	
   	Operators$OperatorWrapperSequence<T>(BaseSequence<T,IterType>).makeYielder(OutType, YieldingAccumulator<OutType,T>, IterType) line: 90	
   	Operators$OperatorWrapperSequence<T>(BaseSequence<T,IterType>).toYielder(OutType, YieldingAccumulator<OutType,T>) line: 69	
   	Yielders.each(Sequence<T>) line: 32	
   	QueryResource.doPost(InputStream, String, HttpServletRequest) line: 224	
   ```
   
   Would be nice to abstract away that top sequence/yielder as well which is used to stream results to the client.
   
   Query (which is kind of silly):
   
   ```sql
   SELECT "__time", "page", "delta"
   FROM "wikiticker"
   WHERE "__time" >= TIMESTAMP '2015-09-12 13:00:00'
     AND "__time" < TIMESTAMP '2015-09-12 15:00:00'
     AND "channel" = '#en.wikipedia'
     AND "isRobot" = 'false'
     AND "page" LIKE 'User talk:D%'
   LIMIT 10
   ```
   
   The next step, when I get a bit more time, is to time each approach for a scan of a large chunk of Wikipedia. That will tell us if the simpler stack does, in fact, translate into faster execution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-1031161332


   Did a bit of performance testing. Created a mock storage adapter and mock cursor that simulates a Wikipedia segment, but just makes up 5M rows of data. Compared performance of operators and yielders. In the default mode, the scan query uses the list-of-maps format, and map creation takes a long time (about 1/4 of the run time), so switched to a compact list.
   
   As it turns out, under this scenario, with 245 batches, the operators and yielders take about the same amount of time: about 490 ms average over 100 runs, after 100 runs of warm-up.
   
   The 245 batches does not place much emphasis on the parts that differ: the per-batch overhead. The default batch size is 5 * 4K rows, so reduced batch size to 1000 rows, so we get 5000 batches.
   
   With this change, the difference in the two approaches becomes clearer. Again, times averaged over 100 queries:
   
   Operators: 499 ms.
   Yielders: 607 ms.
   
   This shows that the reduced number of function call levels in the operator solution takes 82% of the time of stack of sequences and yielders doing the same work.
   
   To be fair, in a normal query, testing suggests that the call yielder stack overhead is lost in the noise of the actual work of the query. So, while operators might be a bit faster, the real win is in the code simplification, and the ability to compose operators beyond what the query runners can do today.
   
   FWIW, the setup is an ancient Intel I7 with 8 cores, running Linux Mint (Ubuntu), Java 14. A python client sent a native scan query to a historical running in the IDE. The query itself selects three columns from one segment with no filters. This is not the optimal performance setup, but all we're interested in is relative costs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers removed a comment on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers removed a comment on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-972029941


   @cheddar, thanks for the comments. I'm impressed that you dug deep into the code! The posted description is kind of high level because I didn't really expect folks to dive in as far as you did. So, let's go into the details. I suppose what we really need is a table that maps the existing concepts to the corresponding operator concepts. I'll add that at some point.
   
   First, let's discuss some of the prototype bits you mentioned to give context for anyone else reading along.
   
   ### Prototype Overview
   
   You did find the key abstraction: the [`Operator`](https://github.com/paul-rogers/druid/blob/pipe/processing/src/main/java/org/apache/druid/query/pipeline/Operator.java) interface. This is just an iterator with `start()` and `close()` methods. In that sense, `Operator` is like a `Sequence`/`Yielder` pair. The difference is that the code to process a batch of records is located entirely in the `Operator.next()` method, meaning that the `Operator` implementation can hold any required state as member variables.
   
   Once we have an `Operator` construct, we can pretty easily [churn out those needed by the Scan Query](https://github.com/paul-rogers/druid/tree/pipe/processing/src/main/java/org/apache/druid/query/pipeline). In this case, the list of operators was created by refactoring the existing code to move the execution bits into an operator class. Basic [unit tests](https://github.com/paul-rogers/druid/tree/pipe/processing/src/test/java/org/apache/druid/query/pipeline) exist for each operator. (One of the nice things about the operator model is that they are easy to test.)
   
   Operators are entirely run-time concepts: they just start, process batches, and close. `QueryRunner`s, as noted, actually do quite a bit of planning. So, the plan-time component moves into a planner abstraction. The planner here is a quick refactor of the scan query `QueryRunner`s into the [HistoricalQueryPlannerStub](https://github.com/paul-rogers/druid/blob/pipe/server/src/main/java/org/apache/druid/server/pipeline/HistoricalQueryPlannerStub.java). This is mostly a proof of concept, created in parallel with the operators: the run-time stuff went into operators, the plan-time stuff into the planner.
   
   The planner works roughly like the `QueryPlanner`s: it is a recursive set of functions, each of which builds its operator given its child, and returns it. A bit like the `QueryPlanner` call's its child's `run()`, then creates another `Sequence` around it. Here, however, we build the operators at plan time, and run them at run time (both of which are just two steps within the overall "run a scan query" task.)
   
   There is two more minor bits. The fragment runner, which ensures operators are closed whether the query succeeds or fails. The `FragmentContext` which is just a holder for all the stuff that an operator might need to do its work. Since the current code is a refactoring, the context just holds the query ID and the response context, which is all the current code uses. The fragment context is just a way of not having to pass in common state into every operator constructor.
   
   ### Why are we Considering This?
   
   Let's tackle the main question first: why are we bothering with this prototype? Let's answer this first since it motivates everything else. There are several reasons as outlined in the proposal.
   
   The first thing to note is that the operator concept is not a new idea: it is the standard approach in a bunch of other tools (Spark, Drill, Presto/Trino, Impala, and many others.) So, we're just trying to draft off of what others have learned.
   
   * Operators are independent of one another. An operator accepts inputs, does its thing, and returns results. It does not care what the input is. By contrast, a `QueryRunner` has to know what child to create (often via a helper such as a toolchest.)
   * Separation of planning and execution. Both occur within a single query request. In the current code, the two operations are mixed. With the operator model, a planner decides which operators are needed, then the operators go off and run the query. Classic separation of concerns.
   * Operators are stateful. Almost all query operators, in any tool, are stateful. Limits need a row count. Merges need to keep track of the next row from each input. And so on. Operators provide a simple, clean way to hold that necessary state. The current implementation has state, of course, but it is added via closures, lambdas and all manner of cleverness.
   * Operators are simple. The operator model has evolved over many years and many projects. The result is the simplest possible way to get the job done: often with very few classes, and other constructs. Simple code is faster, easier to understand and easier to test. Again, a classic observation.
   * Simpler to debug. The stack trace shows that the runtime code is simpler: the actual operations are front and center.
   * Perhaps more efficient: Smaller amounts of code and a smaller runtime stack can mean less overhead. A good next step would be to run an actual performance test to determine if there is any actual benefit. (It could be that most of the time is spent in I/O, so reducing CPU won't help.)
   * Easier to test. Each operator is independent of others. We can test a limit operator, say, by using a mock data source ("reader"): the limit has no way to know (or care) where its data comes from. This allows other query engines to have robust, detailed unit tests for each operator.
   * Reusable. Because operators are agnostic, they are reusable. Of course, the current implementation could also be more resuable, operators are inherently so because they are designed to limit dependencies.
   
   The point of this discussion is: can we get some of this operator goodness for Druid?
   
   ### Answers
   
   With that background, let's answer the questions you raised.
   
   *Different implementation*: The prototype is a refactoring of the existing scan query code, as described above. Since the planner works differently than how the toolchest, etc. works, it was not possible to directly use those in the prototype. Instead, the prototype uses those parts which work as-is, and made a refactored copy of those bits that need to change to work in the context of the new planner. Basically, those parts, such as `mergeResults()`, which build `QueryRunner`s are mapped into a different, but parallel, implementation in the prototype operator planner. If we were to proceed, that code should move to the toolchest, since the toolchest represents the query-type behavior. In other cases, since this is a refactoring, I kept whatever I could, even if that meant the prototype is somewhat more scattered than a final implementation would be.
   
   *Operator vs. Sequence*: Clearly, the `Sequence` idea was meant to represent the result set, without the caller actually having to work with the rows in the result set. The accumulate method would handle any per-row operations. Hence, `Sequence` reifies (represents as an object) the result set, but not the operations on that result set.
   
   By contrast, the `Operator` model reifies (represents as an object) operations, while leaving the result set as a non-reified abstraction. This is important when working with billions of rows: one can never hope to hold the whole result set, one is just building a pipeline that the rows flow through. Hence, the focus on the "pipe fittings", not the water. (Storm made this idea very explicit back in the day.)
   
   Which is "right"? Neither. The question is, which is fastest and simplest. Hence this discussion.
   
   *Semantic difference*: To see the difference in the two approaches, one has to pop up from the code to see the design. We've discussed the design above. The proposal separates planning and execution. It also combines a pile of sequences, yielders, lambdas, closures and other knick-knacks into a simple, clean operator structure. Why? For the reasons above: simpler to understand, easier to test, easier to debug, easier to reuse.
   
   Said another way, both the current design and the prototype run a scan query. In that sense they are the "same." The proposal suggests how we can do the same work with far less code and complexity by leveraging the idea of accepting that a query is a pipeline of stateful operators.
   
   The current code tries to follow the functional programming model to be stateless. But, since queries are, in fact, stateful, the code becomes very complex: state is often added via the "back door" of closures. The operator model accepts that queries have state and manages that state cleanly.
   
   *Stack trace is so much cleaner*: you are on the right track. The reason the stack trace is simpler is that, at runtime, we do only the required work and no overhead, no unnecessary abstractions. A limit operator counts its rows and says its done when its done. That's all. That can be implemented right in the operator. Since operators are very light weight, no reason to have multiple layers of abstraction. The stack trace is smaller because operator are the KISS solution for queries.
   
   Yes, if someone were to implement operators by adding multiple layers of abstraction, then it would be ugly. The point is, those layers are not necessary. Every operator has one very simple task: get some input, do a transform, produce an output. If you want to do two different things, create two operators. We can do this because the planner will choose wisely about which operator to include: we don't include operators we don't need. Operators don't have to be two things. The result is a very simple runtime stack. And, as we've all learned, the name of the game in performance is to minimize the amount of code in the per-row path.
   
   *WrappingSequence, etc.*: You point out the many uses of wrapping sequences. Yes, those are a pain. Many seem to want to do something on sequence start or end, but end up having to insert themselves into the per-batch stream to do that. (We're really in th weeds now.)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974606181


   ### Battle of the Abstractions
   
   The key question this issue raises is: can we make Druid's query code simpler while also making it faster? Is there benefit to simpler/faster code? And (to a lesser degree) what are the trade-offs?
   
   We've claimed that operators are simpler because they are the result of many decades of people trying to find what works for a query DAG. They learned to simplify the data path as much as possible. It turned out to be handy to build on the  iterator abstraction.
   
   To be fair, we should look as closely at the `Sequence` abstraction, which is actually quite clever when used where it works well.
   
   It seems that a `Sequence` is essentially an opinionated list: it represents a ordered, finite sequence of elements. However, `Sequence` prefers not to give up its elements: it wants the caller to pass in a function to accumulate them. One could imagine how handy this could be for, say, a time series of a myriad of items (as in Druid): rather than handing all the items to the caller, have the caller specify what they want by providing an aggregation function. This is, in fact, what Druid's native queries are all about.
   
   The real question is: within a native query, does every transform follow this same pattern? Does a limit dip into the sequence offered by its input? Does a sort dip into its input and pull out a few items?
   
   Unfortunately, only the root of a DAG is in a position to do so. All other elements need to grab an input, do their transform, and pass along the results. In a query, the reader returns only the needed data (this is a key part of the Druid magic). All other transforms must process that data, even if the number of rows is large. Perhaps some transforms can throw out rows (`GROUP BY ... HAVING` for example). But, figuring out what to do is really the sole task of each transform. It is awkward if each transform asks its input to do the actual work.
   
   That is, most transforms must yield intermediate results. So, enter the `Yielder`, which is a highly opinionated way to iterate. A `Yielder` does not just iterate, it wants to accumulate partial results, which is why, to get a `Yielder`, you have to provide a `YieldingAccumulator`. That is, to get the next item, you need a class which will accumulate that one item so the yielder can then return it.
   
   The `Yielder` itself is ephemeral: the "right" way to use them is to have a new one for each value. There is also a special one for EOF. In addition to creating extra garbage on the heap, this means that the `Yielder` is just extra overhead between us and that next row that many transforms want.
   
   This is all great if we mostly do aggregation, and have just a few sequence types. However, all the extra code becomes a burden when we want to do non-aggregate transforms such as limit. We seem to have many, many different sequences, so we pay that complexity cost over and over.
   
   All this seems to suggest that as a base abstraction, an iterator (operator) is simpler. It is straight-forward to build aggregation on top of an iterator when needed, but more tedious to build an iterator on top of an aggregator. Further, we end up using an iterator anyway just to build the aggregator, which we then wrap to get back an iterator.
   
   The KISS principle would suggest that, if both can do the same job, go with the simpler one. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cheddar commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
cheddar commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-971591578


   That smaller stack trace definitely does seem nicer, I looked through the prototype branch and, honestly, wasn't able to fully wrap my head around the difference just from trying to dive into the code that was done.  From your description (and the stack trace) it looks like it's a totally different implementation that might be ignoring the Toolchests and Factories, but it seems like it's still using those?  
   
   Once I found the `Operator` interface itself, if I'm understanding it correctly, I believe it is semantically equivalent to the `Sequence` interface.  `Iterator open()` is semantically equivalent to `Yielder<> toYielder()` and `close()` is semantically equivalent to closing that yielder.  `Sequence` also comes with a method that allows for accumulation with a callback, which was useful at one point but who knows how helpful it is anymore (it was pervasive in the way back ago times when Sequences only had the accumulate method and there was no such thing as a Yielder).  The only seeming semantic difference between them would be the FragmentContext which is being passed directly into the Operator such that the Operator can close over it in the iterator implementation, where the equivalent thing doesn't happen directly on the Sequence, but rather happens on the QueryRunner call.
   
   Honestly, the more I think about it, the more I think that the reason your stack trace is so much cleaner is because your Operators aren't using generic map/concat type operations but instead all of the logic is bundled together into each class which are implementing the Iterator interface.  If someone were to implement the proposed Operators by doing, e.g. a `Iterators.map(input.open(context), a -> a + 2)` I think you would get the same ugly stack traces with Operators as well..  As such, I think that what this proposal actually boils down to is a suggestion that we stop using map/concat/filter style operators and instead have named classes that do specific things so that the stacktrace is more pretty.  Fwiw, making the stack trace more pretty is super useful, so doing that is definitely nice.  There's a chance that the whole callback/accumulator structure of the `Sequence` object still makes the stack traces more ugly, but I think that's not actually the case.  The real thing th
 at makes the stack traces so ugly is that we've got all these `WrappingSequence` and `ConcatSequence` and other things like that which are masking the actual work being done because the lambda that they do the work with doesn't actually make it into the stack trace.
   
   So, going back to your description, in general, I think that you can actually think of the current `QueryRunner` as the `QueryPlanner` (at least, at the segment-level anyway), and the Sequence as the `Operator`.
   
   
   All that said, I could be missing some major and I don't think there's any love for any specific way of writing the code.  If things can be better, let's make them better.  Some of the high level concerns:
   
   1) Technically speaking, the QueryToolChest and QueryRunnerFactories are extension points.  It would be good to get clarity on whether this proposal eliminates them as extension points, replaces them with a new extension point, or just magically keeps using them.  Depending on which one of those it is, strategies might vary.
   2) Query performance and memory management are important.  Not to say that the proposal makes those worse, but swapping out the underlying implementation in-place creates a relatively high burden for making sure that there is no adverse impact to performance and memory management of the process in the face of many concurrent queries.  Given that I think the proposal is just a stylistic adjustment to the current code without any new or different semantic constructs being added, making that adjustment to stop using concat/map/WrappingSequence, etc. shouldn't actually need to come with any sort of impact on performance, so if that understanding is correct, this is maybe not a big deal.
   3) Some parts of the physical plan cannot be determined until the query reaches the segment.  For example, if a query is expecting a numerical column, it might still get a String column in one segment and a numerical column in another segment, it needs to be intelligent enough to not balk at this and instead align the String column to be numerical at query time.  From your explanation, some of me thought the proposal is to move planning to a top-level thing before segments are visited, but the Operators seem to be running and planning at the segment-level, in which case, this is doing per-segment planning.
   4) There might be other requirements that I'm not thinking of immediately, but these are the ones that come to mind first.
   
   Depending on which understanding of the proposal is actually correct, the path to introducing the changes in the "safest" possible manner will be different.  Rather than speculating on all of the paths, it probably makes the most sense to come to a conclusion on what is actually different about the proposal as that is the primary input into which path to implementation makes the most sense.  Just to summarize, I think getting to that understanding of what is different can probably come from answering two questions:
   
   1) Why is the Operator interface not just different methods for the same semantics as what we have today?
   2) What happens to the QueryToolChest and QueryRunnerFactory interfaces in the Operator world?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-972057286


   @cheddar, thanks for the comments. I'm impressed that you dug deep into the code! The issue description is kind of high level because I didn't really expect folks to dive in as far as you did. So, let's go into the details. I suppose what we really need is a table that maps the existing concepts to the corresponding operator concepts. I'll add that at some point.
   
   We'll explain the prototype and answer your questions. First, let's address the main question, which is...
   
   ### Why Are We Considering This?
   
   Let's answer this first since it motivates everything else. There are several reasons as outlined in the proposal.
   
   The first thing to note is that the operator concept is not a new idea: it is the standard approach in a bunch of other tools (Spark, Drill, Presto/Trino, Impala, and many others.) So, we're trying to draft off of what others have learned and see if we can apply some of that learning to Druid.
   
   * Operators are a simple, proven way to implement a query engine. The Volcano paper referenced above laid out the original argument. Numerous implementations show that the operator structure is simpler and easy to develop and debug (ignoring the often highly-complex code related to the creative ways that each tool stores its data. I'm looking at you, Drill value vectors...)
   * Operators are independent of one another. An operator accepts inputs, does its thing, and returns results. It does not care what the input is. By contrast, a `QueryRunner` has to know what child to create (often via a helper such as a toolchest.)
   * Separation of planning and execution. Both occur within a single query request. In the current code, the two operations are mixed. With the operator model, a planner decides which operators are needed, then the operators go off and run the query. Classic separation of concerns.
   * Operators are stateful. Almost all query operators, in any tool, are stateful. Limits need a row count. Merges need to keep track of the next row from each input. And so on. Operators provide a simple, clean way to hold that necessary state. The current implementation has state, of course, but it is added via closures, lambdas and all manner of cleverness.
   * Operators are simple. The operator model has evolved over many years and many projects. The result is the simplest possible way to get the job done: often with very few classes, and other constructs. Simple code is faster, easier to understand and easier to test. Again, a classic observation.
   * Easier to understand. This translates to fewer bugs, more features, more extensions and more innovation. Overly complex code is a chore to change, and thus tends to resist change.
   * Standard solution. Open any database internals textbook, and you'll see an operator tree. Look at any query profile and you'll see an operator tree. Storm, Spark and others allow users to build their own operator trees. Operators are everywhere...except in Druid. By adopting a standard solution, Druid developers can leverage their existing knowledge rather than learning a new Druid-specific design.
   * Simpler to debug. The stack trace shows that the runtime code is simpler: the actual operations are front and center.
   * Perhaps more efficient: Smaller amounts of code and a smaller runtime stack can mean less overhead. A good next step would be to run an actual performance test to determine if there is any actual benefit. (It could be that most of the time is spent in I/O, so reducing CPU won't help.)
   * Easier to test. Each operator is independent of others. We can test a limit operator, say, by using a mock data source ("reader"): the limit has no way to know (or care) where its data comes from. This allows other query engines to have robust, detailed unit tests for each operator.
   * Reusable. Because operators are agnostic, they are reusable. Of course, the current implementation could also be more reusable, operators are inherently so because they are designed to limit dependencies.
   * Easier to extend. Fewer dependencies means it's easier to insert new operators. Since the things which read data (“readers” is the common term) are just operators, it is easy to add new data sources (ignoring the planning aspect for the moment.) We keep the Druid extension goodness, but make extensions easier to add.
   
   Again, the above isn't something I made up, or am suggesting as a "good idea." The above is a summary of what other tools have experienced. The point of this discussion is: can we get some of this operator goodness for Druid?
   
   ### What About the Current Code?
   
   An implicit assumption of this proposal is that the operator model provides benefits. You ask a good question: doesn't the current code provide those same benefits? This is tricky: the goal here is not to critique that code: it works and has worked for many years. It will probably work well for many more. All I can really say is that, as a newbie coming from competing tools, the query pipeline code in Druid seems more complex than it needs to be. Obviously, all working code is complex (as observed by someone many years ago.) However, is that complexity *necessary*, an *essential* part of solving the problem at hand, or is it *incidental*, just happens we've chosen a complex way to solve the problem.
   
   The gist of this proposal is that 1) query engines are complex, 2) the Druid code is more complex than other similar code bases for the same task, and that 3) there may be benefits from refactoring to a simpler solution so we reduce unnecessary, incidental complexity.
   
   That's what this issue asks us to consider and discuss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on issue #11933: Discussion: operator structure for Druid queries?

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on issue #11933:
URL: https://github.com/apache/druid/issues/11933#issuecomment-974603066


   Per a suggestion by @cheddar, did an experiment using the `Sequence`/`Yielder` pair. The question is: if we used straightforward classes, could the code based on `Sequence` be as simple as the prototype based on the operator protocol? Is the reason for stack clutter and extra code due more to excessive use of inner classes and lambdas than to the `Sequence` abstraction itself?
   
   The result is a [limit sequence](https://github.com/paul-rogers/druid/blob/ad5147a72ee77fd8e86655ea4e03c7e64e88f9b5/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitSequence.java), and a [base transform sequence](https://github.com/paul-rogers/druid/blob/ad5147a72ee77fd8e86655ea4e03c7e64e88f9b5/processing/src/main/java/org/apache/druid/query/scan/TransformSequence.java). Didn't refactor all of the scan query, just did the scan query limit operation. 
   
   This one little experiment suggests that, in fact, it probably is the `Sequence`/`Yielder` pattern that leads to complex code. In fact, the current limit implementation might actually be about as simple as we an make it, if we are required to use the `Sequence`/`Yielder` pattern.
   
   First, let's describe the new prototype, then later comments will outline what was learned.
   
   After fiddling about, the simplest implementation was to use the previous [operator](https://github.com/paul-rogers/druid/blob/pipe/processing/src/main/java/org/apache/druid/query/pipeline/ScanResultLimitOperator.java) version as the basis for the new prototype. The reason is that the operator code is about as simple as one can make a limit operation. That code was refactored from the original [limit iterator](https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/query/scan/ScanQueryLimitRowIterator.java) by replacing the original  `Sequence`/`Yielder` input with an input operator.
   
   Then, to turn the prototype limit into a `Sequence`, the code added the sequence methods `accumulate()` and `toYielder()`. Of these, `toYielder()` seems to especially complex. The easiest path seemed to be to adapt the code from `BaseSequence` to implement both `accumulate()` and `toYielder()` in terms of the iteration methods from the prior code.
   
   The result is that our limit sequence is a simple iterator over batches (doing the needed counting, and truncating the last batch when needed), along with a bunch of extra overhead to implement the `Sequence`/`Yielder` protocol.
   
   In fact, it is probably simpler to split these two concepts, as in the first prototype: an operator is fancy iterator: super simple. If you want a sequence, just wrap the operator in a standard sequence wrapper class. The result would probably be simpler than the current prototype: we'd use composition rather than the inheritance used to convert the limit into a sequence.
   
   We can now see why the current Druid limit code is already close to optimal, given the `Sequence`/`Yielder` requirement. It is an iterator wrapped in a reusable sequence class that provides a standard yielder.
   
   This also suggests why Druid uses so many extra knick-knacks at runtime. One often needs an iterator-like thing that does the work, a sequence wrapper around the iterator, and a yielder/yielding accumulator wrapper around the sequence. For. Every. Transform.
   
   Bottom line: the core limit operation is simple. It becomes bogged down, however, when it has to implement the `Sequence`/`Yielder` protocol. The operator prototype shows that code is actually not needed. Druid would be simpler if we just removed it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org