You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "Paul Rogers (JIRA)" <ji...@apache.org> on 2017/07/02 01:18:03 UTC

[jira] [Created] (DRILL-5656) Streaming Agg Batch forces sort to retain in-memory batches past NONE

Paul Rogers created DRILL-5656:
----------------------------------

             Summary: Streaming Agg Batch forces sort to retain in-memory batches past NONE
                 Key: DRILL-5656
                 URL: https://issues.apache.org/jira/browse/DRILL-5656
             Project: Apache Drill
          Issue Type: Bug
    Affects Versions: 1.10.0
            Reporter: Paul Rogers


Drill provides the {{StreamingAggBatch}} operator. The operator performs quite a bit of processing in the template used to generate code. Consider {{StreamingAggTemplate.doWork()}}. Consider an input operator that returns one batch (with {{OK_NEW_SCHEMA}} then returns {{NONE}}).

The top part of {{doWork()}} iterates over the incoming rows of the first batch (indexed by an SV4 in this case.) Then comes a loop to process the remaining batches:

{code}
          InternalBatch previous = new InternalBatch(incoming, context);
          while (true) {
            IterOutcome out = outgoing.next(0, incoming);
{code}

For an SV4, the above code does not clone or transfer the incoming data; it simply retains a reference to it. This is because of the way an SV4 works: an SV4 iterates over a fixed hyper-batch: a set of batches. Consider the sort. The first call to the sort's {{next()}} returns a virtual batch with references, say, to 5 underlying batches. The next {{next()}} returns a different set of references to the same set of five batches. The virtual batch consists of a set of references that read out records in sorted order. Complex, yes, but avoids unnecessary data copies.

Consider what happens when the sort reaches the end of the virtual batches. The sort has a large amount of data sitting in memory (potentially many GB). The sort is done; there is nothing left to deliver downstream. A tidy implementation would free up the batches so that the memory can be better used elsewhere.

But, doing that causes the following streaming agg code to fail with an {{IndexOutOfBounds}} exception:

{code}
            switch (out) {
            case NONE:
              ...
              if (addedRecordCount > 0) {
                outputToBatchPrev(previous, previousIndex, outputCount);
{code}

That is, on NONE, the streaming agg batch wants to reference to the data from the prior batch, by holding a reference to the hyper-vectors from (in this case) the sort.

The result is that the sort *cannot* release memory when returning NONE; it must retain all the in-memory batches (since the last virtual batch might reference any of them.)

But, the next opportunity for the sort to release memory is on {{close()}}. The problem is, the operators above streaming agg might include other memory-intensive operations. Since sort cannot release its memory, those other operations suffer. The sort memory is retained, though, say, another sort or hash join, all the way to query completion.

The reason for this JIRA (rather than just fixing the bug) is that the present iterator interface has no way to work around this issue cleanly. Four "hacks" are possible:

1. The streaming agg calls {{close()}} on the incoming after finishing with the previous incoming batch. (The problem is, all possible incoming operators must be able to handle multiple calls to close: one from streaming agg, another from the segment executor.)
2. The streaming agg calls {{next()}} on the incoming batch a second time after receiving {{NONE}}. The sort can track this case and release its memory. The problem is that the iterator validator (and probably other operators) forbid this pattern.
3. Add a new method to the iterator protocol, {{cleanup}}, that is a partial close: says to release all resources after the operator returns {{NONE.}}
4. Special case hack if the type of the incoming is the sort: cast to that operator and call an operator-specific method.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)