You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/01/17 22:29:40 UTC

[GitHub] [drill] paul-rogers commented on issue #2421: [DISCUSSION] ValueVectors Replacement

paul-rogers commented on issue #2421:
URL: https://github.com/apache/drill/issues/2421#issuecomment-1014927069


   @Leon-WTF, you ask a good question. I'm afraid the answer is rather complex: hard to describe in a few words. Read if you're interested.
   
   Let's take a simple query. We have 10 scan fragments feeding 10 sort fragments that feed a single merge/"screen" fragment. Drill uses a random exchange to shuffle data from the 10 scan fragments to the 10 sorts, so that every sort gets data from every reader, balancing the load. There is an ordered exchange from the sorts to the merge. In "Impala notation":
   
   ```text
   Screen
   |
   Merge
   |
   Ordered Receiver
   |   -   -   -   -   -   -   -   -
   Ordered Sender
   |
   Sort
   |
   Unordered Receiver
   |  -   -   -   -   -   -   -   -
   Random Sender
   |
   Scan 
   ```
   
   The dotted lines are an ad-hoc addition to represent fragment boundaries. Remember there are 10 each of the lower two fragments, 1 of the top.
   
   First,lets consider the in-memory case. The scans read data and forward batches to random sorts. The sorts sort each incoming batches, sorts each one, and buffer them. When the sort sees EOF from all its inputs, it merges the buffered batches and sends the data downstream to the merge, one batch at a time.
   
   The merge needs a batch from each sort to proceed, so the merge can't start until the last sort finishes. (This is why a Sort is called a "blocking operator" or "buffering operator": it won't produce its first result until it has consumed all its input batches.)
   
   Everything is in memory, so each Sort can consume batches about as fast as the scans can produce them. The sorts all work in parallel, as we'd hope. The merge kicks in last, but that's to be expected: one can't merge without having first sorted all the data.
   
   OK, now for the spilling case, where the problems seem to crop up. (Full disclosure: I rewrote the current version of the Sort spilling code.) Now, data is too large to fit into memory for the sort. Let's focus on one sort, call it Sort 1. Sort 1 reads input batches until it fills its buffer. Then, Sort 1 pauses to write its sorted batches to disk. Simple enough. But, here is where things get tricky.
   
   All the scans must fill their output batches. Because we're doing random exchanges to the sorts, all outgoing batches are about the same size. Let's now pick one scan to focus on, Scan 2. Scan 2 fill up the outgoing batch for Sort 1, but Sort 1 is busy spilling, so Sort 1 can't read that batch. As a result, Scan 2 is blocked: it needs to add one more row for Sort 1, but it can't because the outgoing batch is full, and the downstream Sort won't accept it. So, Scan 2 grinds to a halt.
   
   The same is true for all other scans: as soon as they want to send to Sort 1 (which is spilling), they block. Soon, all scans are blocked. This means that all the other Sorts stop working: they have no incoming batches, so they are starved.
   
   Eventually, Sort 1 completes its spill and starts reading again. This means Scan 2 can send and start working again. The same is true of the other Scans. Now, the next Sort, Sort 2, needs to spill. (Remember, the data is randomly distributed, so all Sorts see about the same number of rows.) So, the whole show occurs again. The scans can't send to Sort 2, so they stall. The other scans become starved for inputs, and so they stop.
   
   Basically, the entire system becomes serialized on the sort spills: effectively, across the cluster only one spill will be active at any time, and that spill blocks senders which blocks the other sorts. We have a 10-way distributed, single-threaded query.
   
   Now, I've omitted some details. One is that, in Drill, every receiver is obligated to buffer three incoming batches before it blocks. So, the scenario above is not exactly right: there is buffering in the Unordered Receiver below each Sort. But, the net effect is the same once that three-batch buffer is filled.
   
   Even here there is an issue: remember: each scan hast to buffer 1024 for every sort. We have 10 scans and 10 sorts, so we're buffering 100 batches or 100K rows. And, the sorts have buffers of 3 batches each, so that's another 30 batches total, or 30K rows. All that consumes memory which is not available for the sort, hence the sort has to spill. That is, the memory design for Drill takes a narrow, per-operator view, and does not optimize memory use across the whole query.
   
   All of the above is conjecture based on watching large queries grind to a crawl. Drill provides overall metrics, but not metrics broken down by time slices, so we have no good way to visualize behavior. Using ASCII graphics, here's what we'd expect to see:
   
   ```text
   Sort 1: rrrsss____________r_______r_______rsss_________
   Sort 2: rr___rsss__________r_______r_______rsss________
   ....
   Scan 1: www__w____________www_____www_____www___
   Scan 2: www__w____________www_____www_____www___
   ```
   
   Where "r" means "read and sort a batch", "s" means "spill", "w" means "write a batch downstream, and "_" means "blocked or starved." Everything is parallel until the first spill, then everything serializes. There are 10 scans, so the extra wait time is for the other 8 scans to do their thing. All very complex!
   
   And, this problem grows with the square of the number of nodes. With 100 fragments, we buffer 10K batches of 1K rows each, or 10M rows sitting in send buffers. This should be screaming at us, "you're doing it wrong!"
   
   Another detail I omitted is that Drill has something called a "Mux Exchange". 
   ```text
   ...
   |
   Unordered Receiver
   |    -   -   -   -   -   -   -   -
   | Mux Sender
   |
   | Mux Receiver
   |    -   -   -   -   -   -   -   -
   Unordered Sender
   ...
   ```
   
   All the scans on a single node share a set of outgoing buffers. Now, nobody understands this beast, and it is implemented as a separate layer of exchanges, which seems to make things worse. I remember back in the day, someone tried to characterize its behavior. But, since we didn't understand it, we just kept trying "Mux On/Mux Off" with out really knowing what was happening. Having a part of the system that no one understands is never a great way to proceed.
   
   Still, the Mux thing is a good idea even if the implementation is flawed. Ideally, it would reduce the number of outgoing batches from n^2 to just n. But, it adds so much complexity, with vectors, that the benefit could never be clearly seen.
   
   How would rows work better than vectors? With vectors, we build up, say, 1024 rows in each batch before sending. We can't send until the batch is full, but if any batch is full, we can't add another row until we send the batch we've got. This is too crude and leads to the above scenario. The Mux exchange can consume rows so that the scans don't have to build up full batches to forward to the Mux.
   
   Better would be to have a buffer of up to 1024 rows, but we send whatever we have after some time. Now, each scan can continue to send to other sorts even while holding rows for Scan 1 while it spills, In this case, some of the other sorts will also fill their buffers and start to spill concurrently with Scan 1.
   
   Another idea is to do what Apex and other systems do: spill to disk if the receiver is blocked. (Or, equivalently, spill to disk on the receiver side if the consuming operator does not consume rows.)
   
   Yet another solution is to run the Sort in multiple threads so that it spills and consumes concurrently. (The current design is single-threaded and is pretty aggressive about how much is spilled: we inherited that logic from the "first-generation" sort spill code.)
   
   Now, I've focused on the Sort because I know that code best. But, the same is true for other blocking operators: joins, aggregations, etc.
   
   When Drill is used embedded, or at small scale, you will never see this issue. When Drill runs at large scale, so that spilling kicks in, then this issue becomes the dominant problem that prevents high performance.
   
   Sorry that this is all rather complex. Since it is complex, it goes unseen and unfixed. This is probably even a well-known problem in distributed computing circles: if any one knows of a good reference, please post it.
   
   For Drill 2.0, we have to decide if we want to stay in the large-scale distributed system business, or just diddle around as an embedded desktop app to read PDF files and Excel spreadsheets. At scale, we have to fix the above issues. As a desktop app, we can rip out all the complex distributed cruft that no one understands and just be a decent several node solution.
   
   But, there is no path that says we compete with Presto/Trino/Impala, etc. at scale while focusing on desktop concerns. Distributed system are *hard*, and we have to know what we're doing to offer an effective solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org