You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by sachouche <gi...@git.apache.org> on 2018/04/23 01:10:00 UTC

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

GitHub user sachouche opened a pull request:

    https://github.com/apache/drill/pull/1237

    DRILL-6348: Fixed code so that Unordered Receiver reports its memory …

    Problem Description -
    - The Unordered Receiver doesn't report any memory usage because the RPC infrastructure associated the allocated memory ownership to the minor fragment container
    
    Proposed Fix -
    - Modified the code to change the received DrillBuf memory ownership from the parent fragment to the operator (as discussed with @parthchandra)
    - Made sure that the buffer accounting logic is preserved
    
    @parthchandra, can you please review this pull request?
    
    Thanks!

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sachouche/drill DRILL-6348

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1237.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1237
    
----
commit 9ab501a3fc3ebbda44c1478ad9db6711192b8ca1
Author: Salim Achouche <sa...@...>
Date:   2018-04-23T01:02:35Z

    DRILL-6348: Fixed code so that Unordered Receiver reports its memory usage

----


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184728292
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    +      batch = getNextBatch();
    +
    +      // skip over empty batches. we do this since these are basically control messages.
    +      while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
    --- End diff --
    
    Ignore this comment as I thought you were releasing the returned batch.


---

Re: [DISCUSS] batch ownership

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
Hi Vlad,

Glad to see you are becoming an expert in the mechanics of data batch handling. This is a complex area that deserves the care and attention your are investing.

Drill's current behavior reflects the design decisions of Drill's original authors. Unfortunately, those authors are no longer available. (If you are out there, lurking, now would be a great time to help out Vlad by explaining the original design.) Failing that, we have to use our collective knowledge of the intended design. Plus, we should explore ways to improve the design, as you seem to be doing.

Drill has a complex memory model that works only if each operator ("record batch" in Drill's unfortunate terminology) takes ownership of each incoming record batch ("vector container" in Drill's terminology.) Recall that each operator has an operator-specific memory allocator with its own budget (though, at present, but budget numbers are completely artificial and nonsensical.) In addition, the minor fragment as a whole has a budget.

For the operator budget to work, the operator must take ownership of incoming batches, and give up ownership of outgoing batches. Why? Because doing so is the only way to track the memory that each operator uses in its operator-specific allocator. While this may not be the ideal design, it is how Drill works today.

If we move fully to the budget-based design, than this level of operator control will no longer be necessary, and will be an unnecessary complication. Under the budget model, only the minor fragment as a whole needs an allocator; each operator plays its part within the overall fragment budget. A planning step works out the memory budget for the query, the minor fragments and each operator. This is all explained in [1].

Under the budget model, each operator attempts to stay within its budget, spilling to disk as needed. The budget model works only if "single batch" operators (such as Project, Filter, etc.) are given sufficient memory to hold two batches. This, in turn, requires that we control the size of each batch as Padma and others are doing.

That said, today exchanges *might* be special. My understanding is that some can receive a single batch from the network and feed that single batch to multiple slices ("minor fragments") of the same operator. This happens in, say, a broadcast exchange.

You mention SV2 mode. In fact, SV2 mode should operate the same as "plain" batches: an SV2 is a single indirection vector on a single batch of data. Perhaps you meant "SV4 mode." Indeed, SV4 is special since an SV4 sits atop a large collection of batches and simulates a batch by picking out a collection of rows across the many batches. SV4 is used in the output of an in-memory sort (and perhaps other places.) There is no transfer of ownership in SV4 mode because the same batches will be used over and over until all data is delivered. It is the responsibility of the Sort operator to release the collection of batches once it has delivered all results (or the query fails.)


Enough for this response. I'll send additional responses for your other points.

The key concept to keep in mind is that the Drill memory system, as a whole, is quite complex. It can certainly be improved (as we are doing with the batch handling revisions.) But, we must consider the entire system when considering changes to any one part of the system. It is a complex topic; it is great that we have someone with your experience exploring our options.

Thanks,
- Paul

[1]  https://github.com/paul-rogers/drill/wiki/Batch-Handling-Upgrades


 

    On Sunday, April 29, 2018, 9:26:24 PM PDT, Vlad Rozov <vr...@apache.org> wrote:  
 
 I did not mean that a pass-through operator should not take the 
ownership of a batch it processes. My question was whether they do so 
and if they do, when and how. As far as I can see in the 
ProjectorTemplate code, the transfer is not done in all cases and when 
Projector operates in sv2 mode, there is no transfer of the ownership. 
Additionally, when there is a transfer, it is done when the processing 
of the batch is almost complete. IMO, such behavior is counter intuitive 
and I would expect that if there is a transfer of the ownership, it is 
part of  RecordBatch.next(), meaning that once an operator gets a 
reference to a record batch, it owns it. At this point, an operator may 
consume content of the record batch and create a completely new record 
batch or it can modify the record batch and pass it to the next 
downstream operator.

The behavior above applies to an operator that consumes record batches 
from another operator. An input operator (scan or edge operator) is an 
operator that produces record batches from an external source (parquet 
file, hbase, kafka, etc). IMO, when such operators create record batches 
they should allocate memory using operator allocator compared to 
fragment allocator. If the memory is allocated using fragment allocator, 
there is no point changing ownership when batch construction is complete 
and the batch is passed to the next operator.

The same approach applies to senders and receivers. Senders gets batches 
from the upstream operators taking ownership of those batches and send 
data to receivers. Receivers get data from senders and reconstruct 
record batches. It is the business logic of senders and receivers and 
they may rely on other libraries (rpc and netty) or classes to handle 
serialization/de-serialization, buffering, acknowledgment, back-pressure 
or dealing with network. From other Drill operators point of view, 
senders and receivers are operators responsible for passing record 
batches from one drillbit to another.

Following your approach it is necessary to modify MergingReceiver as 
well. It also pulls batches from a queue (see 
MergingRecordBatch.getNext()), but instead of almost immediately passing 
it to a next operator as UnorderReceiver does, MergingReceiver creates a 
new record batch from those batches that it pulls from the queue. To be 
consistent with proposed changes to UnorderReceiver, it is necessary to 
change the ownership of batches that MergingReceiver pulls as well 
especially that MergingReciver may keep reference to the original batch 
much longer compared to UnorderedReceiver (while it waits for batches 
from other drillbits).

I don't see a reason to modify both UnorderedReceiver and 
MergingReceiver, instead, I think, we should modify allocator used when 
batches are created in the first place before they are added to a queue.

Thank you,

Vlad

On 4/27/18 18:10, salim achouche wrote:
> Correction for example II as Drill uses a single thread per pipeline (a
> batch is fully processed before the next one is; only receive of batches
> can happen concurrently):
> - Using batch identifiers for more clarity
> - t0: (fragment, opr-1, opr-2) = ([b1], [], [])
> - t1: (fragment, opr-1, opr-2) = ([b2], [b1], [])
> - t2: (fragment, opr-1, opr-2) = ([b3,b2], [], [b1])
>        (fragment, opr-1, opr-2) = ([b3], [b2], [])
>        (fragment, opr-1, opr-2) = ([b3], [], [b2])
>        (fragment, opr-1, opr-2) = ([], [b3], [])
>        (fragment, opr-1, opr-2) = ([], [], [b3])
>
> The point remains the same that change of ownership for pass-through
> remains valid as it doesn't inflate resource allocation for a given time
> snapshot.
>
>
> On Sat, Apr 28, 2018 at 12:42 AM, salim achouche <sa...@gmail.com>
> wrote:
>
>> Another point, I don't see a functional benefit from avoiding a change of
>> ownership for pass-through operators. Consider the following use-cases:
>>
>> Example I -
>> - Single batch of size 8MB is received at time t0 and then is passed
>> through a set of pass-through operators
>> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
>> forth
>> - Assume we report memory usage at time t0 - t2; this is what will be seen
>> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
>> - t1: (fragment, opr-1, opr-2) = (0, 8MB, 0)
>> - t2: (fragment, opr-1, opr-2) = (0, 0, 8MB)
>>
>> Example II -
>> - Multiple batches of size 8MB are received at time t0 - t2 and then is
>> passed through a set of pass-through operators
>> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
>> forth
>> - Assume we report memory usage at time t0 - t2; this is what will be seen
>> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
>> - t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0)
>> - t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB)
>>
>>
>> The key thing is that we clarify our reporting metrics so that users do
>> not make the wrong conclusions.
>>
>> Regards,
>> Salim
>>
>> On Fri, Apr 27, 2018 at 11:47 PM, salim achouche <sa...@gmail.com>
>> wrote:
>>
>>> Vlad,
>>>
>>> - My understanding is that operators need to take ownership of incoming
>>> buffers (using
>>>
>>> the vector method transferTo())
>>>
>>> - My view is not that receivers are pass-through; instead, I feel that
>>> sender & receiver operators should focus on their business logic
>>>
>>> - It just happens that the unordered-receiver does very little
>>> (deserializes the batch through the BatchLoader)
>>>
>>> - Contrast this with the merge-receiver which needs to consume data from
>>> multiple inputs to provide ordered batches
>>>
>>> - The operator implementation will dictate how many batches are consumed
>>> (this should have nothing to do with communication concerns)
>>>
>>> - Intricacies of buffering, acking, back-pressuring, etc is ideally left
>>> to a communication module
>>>
>>>
>>> My intent, is to consistently report on resource usage (I am fine if we
>>> exclude pass-through operators as long as we do it consistently). The next
>>>
>>> enhancement that I am planning to do is to report on the fragment
>>> buffered batches. This will enable us to account for such resources when
>>> analyzing
>>>
>>> memory usage.
>>>
>>> On Fri, Apr 27, 2018 at 9:50 PM, vrozov <gi...@git.apache.org> wrote:
>>>
>>>> Github user vrozov commented on the issue:
>>>>
>>>>      https://github.com/apache/drill/pull/1237
>>>>
>>>>      IMO, it will be good to understand what other operators do as well.
>>>> For example what Project or Filter operators do. Do they take ownership of
>>>> incoming batches? And if they do, when is the ownership taken?
>>>>
>>>>      I do not suggest that we change how Sender and Receiver control
>>>> **all** aspects of communication, at least not as part of this JIRA/PR. The
>>>> difference in my and your approach is whether or not UnorderedReceiver and
>>>> other receivers are pass-through operators. My view is that receivers are
>>>> not pass-through operators and they are buffering operators as they receive
>>>> batches from the network and buffer them before downstream operators are
>>>> ready to consume those batches. In your view, receivers are pass-through
>>>> operators that get batches from fragment queue or some other queue and pass
>>>> them to downstream. As there is no wait and no processing between getting a
>>>> batch from fragment queue and passing it to the next operator, I don't see
>>>> why a receiver needs to take the ownership.
>>>>
>>>>
>>>> ---
>>>>
>>>

  

Re: [DISCUSS] batch ownership

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
Hi Vlad,
More responses.
> The same approach [as for internal operators] applies to senders and receivers. Senders gets batches 
from the upstream operators taking ownership of those batches and send 
data to receivers.

Senders receive data from an "upstream" operator, then serialize over the wire. As a result, Senders take ownership from the upstream operator, but then must transfer ownership to Netty. Here I'll speculate. I believe that we create a Netty composite buffer that strings together the buffers that underlie the value vectors in the outgoing record batch. (Yes, there are many layers in play.)

Netty does not know about our allocator model. It does, however, have a reference count. So, my guess is that the Sender somehow gives up ownership of the outgoing buffer in the sense of the Drill allocator, but lets Netty drop the reference count once Netty has sent the buffer.

I believe you are quite familiar with Netty, so perhaps you can dig around here and explain how this actually works.

> Receivers get data from senders and reconstruct 
record batches.

You are right logically. But, physically there is a difference. Data arrives via Netty which allocates buffers for the data. Receivers take these raw buffers and turn them into batches. Here things get even more complex (if that is possible.) The Receiver creates multiple vectors on top of a single Netty buffer. That is, multiple vectors were serialized together and were read together. Much of the complexity of Drill's memory model comes from the ability to create multiple (logical) DrillBufs on top of a single (physical) Netty buffer. This is where we need reference counts (so we know when the last shared use goes away), and where we need the UDLE/DrillBuf separation.

So, again, Netty does not play the Drill "ownership" game, it only does reference counts. So the Receiver must convert from the Netty reference count of the big incoming buffer, to reference counts for each materialized vector, and create some kind of entry in Drill's allocator. I'm not sure how this is done; it would be great if you could figure this out.

Could this be done differently? Probably. Maybe serialize each buffer by itself so that Netty creates separate buffers for each. I'd guess the original authors started with this design and moved to the present one, perhaps for performance reasons. (Anyone know of the history here?)

> It is the business logic of senders and receivers and 
they may rely on other libraries (rpc and netty) or classes to handle 
serialization/de-serialization, buffering, acknowledgment, back-pressure 
or dealing with network. From other Drill operators point of view, 
senders and receivers are operators responsible for passing record 
batches from one drillbit to another.

True. Senders/Receivers should speak Drill operator protocol on one side, Netty protocol on the other. They are adapters. Is this not what you see?

> Following your approach it is necessary to modify MergingReceiver as 
well. It also pulls batches from a queue (see 
MergingRecordBatch.getNext()), but instead of almost immediately passing 
it to a next operator as UnorderReceiver does, MergingReceiver creates a 
new record batch from those batches that it pulls from the queue. To be 
consistent with proposed changes to UnorderReceiver, it is necessary to 
change the ownership of batches that MergingReceiver pulls as well 
especially that MergingReciver may keep reference to the original batch 
much longer compared to UnorderedReceiver (while it waits for batches 
from other drillbits).

I personally don't know the details. But, in general, if one operator passes data to another, it should play by the Drill ownership rules if it works with vectors. If, instead, it works with buffers, then it should probably play by the Netty rules.

> I don't see a reason to modify both UnorderedReceiver and 
MergingReceiver, instead, I think, we should modify allocator used when 
batches are created in the first place before they are added to a queue.

My own suggestion here is that we may want to make use of an old-school technique that is still often handy: write up the design. Document the rules I've been doing my best to explain above. Add a detailed explanation of how Drill interfaces with Netty. Then, think through how we wan to handle the Drill-opererator-to-Netty interface.

Another particularly nasty area is the "Mux" operators. Several folks struggled to understand them and didn't get very far. This is not a good state to be in. We should really understand how they work. Perhaps understanding the most complex case will help shed light on the case under discussion.
Thanks,

- Paul


  

Re: [DISCUSS] batch ownership

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
Specific answers based on my understanding.

 > I did not mean that a pass-through operator should not take the 
ownership of a batch it processes. My question was whether they do so 
and if they do, when and how.

Yes, operators do take ownership, somewhere in the process of calling next() on their inputs. The exact place may vary between operators. In the Sort, for example, the code first checks the incoming batch size, spills sorted batches if needed to make space, then takes ownership. I'd go so far as to say that, if an operator does not take ownership, then it is a bug.

> As far as I can see in the 
ProjectorTemplate code, the transfer is not done in all cases and when 
Projector operates in sv2 mode, there is no transfer of the ownership. 

Template code is code that is copied for each generated operator. In general, this code should be minimal. Code that is common to all operator instances should not reside in the template. Instead, it should reside in the operator (the so-called RecordBatch). There is really no reason to copy the same byte codes over and over, taking up space in the code cache.

That said, the code to take ownership is likely to be in the Project operator implementation. Look for a place that works with "transfer pairs", they are the actual transfer mechanism. A quick glance at the code suggests this is done in ProjectRecordBatch.setupNewSchemaFromInput(). (An unfortunate name if we also do transfers.)

> Additionally, when there is a transfer, it is done when the processing 
of the batch is almost complete. 

Depends on what you mean by "almost complete." Since Project is single-threaded, there is no harm in doing the transfer later rather than sooner; the upstream operator won't be called until Project again calls next(). Makes sense to do it earlier, but not necessary.

> IMO, such behavior is counter intuitive 
and I would expect that if there is a transfer of the ownership, it is 
part of  RecordBatch.next(), meaning that once an operator gets a 
reference to a record batch, it owns it. 

Perhaps. But, the Operator (that is, RecordBatch) protocol is a bit fussy. The next() call to RecordBatch tells that RecordBatch to build a batch of data and make it available. An operator has no visibility to its parent (its downstream operator). The caller must do the transfer as only the caller has visibility to its own vector container and that of the upstream (incoming) record batch. Yes, this is quite confusing. Nothing beats stepping though several operators to see how this works in practice.

Here, I will put in a plug for the revised Operator classes in the "batch handling" code. The new classes try to disentangle the many bits of functionality combined in Record Batch. Those three are: 1) iterator protocol, 2) batch management, and 3) operator implementation. I believe we'll all understand this code better if we can separate these three concerns.

> At this point, an operator may 
consume content of the record batch and create a completely new record 
batch or it can modify the record batch and pass it to the next 
downstream operator.

Just to be clear, record batches (specifically vectors) are immutable. It is not possible to modify a record batch. One can, however reuse parts of it. A Filter can slap on an SV2. A Project can discard some vectors, add others, and retain still others. But, in both cases, the operator must produce a new batch based on those vectors. Specifically, each operator has its own VectorContainer that contain its own vectors. Sharing occurs at the level of DrillBufs that underlie the vectors. (Again, quite confusing, but it makes sense once you understand the operator allocators we discussed previously.)

Part of the complexity comes from proper memory management. New vectors are allocated in the Project operator's allocator. Retained vectors are transferred from the upstream operator's allocator (ledger) to the that of the Project operator. Discarded vectors are released (perhaps after being shifted into the Project operator's allocator.)

OK, again enough for one note. More to come.

Thanks,

- Paul
  

[DISCUSS] batch ownership

Posted by Vlad Rozov <vr...@apache.org>.
I did not mean that a pass-through operator should not take the 
ownership of a batch it processes. My question was whether they do so 
and if they do, when and how. As far as I can see in the 
ProjectorTemplate code, the transfer is not done in all cases and when 
Projector operates in sv2 mode, there is no transfer of the ownership. 
Additionally, when there is a transfer, it is done when the processing 
of the batch is almost complete. IMO, such behavior is counter intuitive 
and I would expect that if there is a transfer of the ownership, it is 
part of  RecordBatch.next(), meaning that once an operator gets a 
reference to a record batch, it owns it. At this point, an operator may 
consume content of the record batch and create a completely new record 
batch or it can modify the record batch and pass it to the next 
downstream operator.

The behavior above applies to an operator that consumes record batches 
from another operator. An input operator (scan or edge operator) is an 
operator that produces record batches from an external source (parquet 
file, hbase, kafka, etc). IMO, when such operators create record batches 
they should allocate memory using operator allocator compared to 
fragment allocator. If the memory is allocated using fragment allocator, 
there is no point changing ownership when batch construction is complete 
and the batch is passed to the next operator.

The same approach applies to senders and receivers. Senders gets batches 
from the upstream operators taking ownership of those batches and send 
data to receivers. Receivers get data from senders and reconstruct 
record batches. It is the business logic of senders and receivers and 
they may rely on other libraries (rpc and netty) or classes to handle 
serialization/de-serialization, buffering, acknowledgment, back-pressure 
or dealing with network. From other Drill operators point of view, 
senders and receivers are operators responsible for passing record 
batches from one drillbit to another.

Following your approach it is necessary to modify MergingReceiver as 
well. It also pulls batches from a queue (see 
MergingRecordBatch.getNext()), but instead of almost immediately passing 
it to a next operator as UnorderReceiver does, MergingReceiver creates a 
new record batch from those batches that it pulls from the queue. To be 
consistent with proposed changes to UnorderReceiver, it is necessary to 
change the ownership of batches that MergingReceiver pulls as well 
especially that MergingReciver may keep reference to the original batch 
much longer compared to UnorderedReceiver (while it waits for batches 
from other drillbits).

I don't see a reason to modify both UnorderedReceiver and 
MergingReceiver, instead, I think, we should modify allocator used when 
batches are created in the first place before they are added to a queue.

Thank you,

Vlad

On 4/27/18 18:10, salim achouche wrote:
> Correction for example II as Drill uses a single thread per pipeline (a
> batch is fully processed before the next one is; only receive of batches
> can happen concurrently):
> - Using batch identifiers for more clarity
> - t0: (fragment, opr-1, opr-2) = ([b1], [], [])
> - t1: (fragment, opr-1, opr-2) = ([b2], [b1], [])
> - t2: (fragment, opr-1, opr-2) = ([b3,b2], [], [b1])
>         (fragment, opr-1, opr-2) = ([b3], [b2], [])
>         (fragment, opr-1, opr-2) = ([b3], [], [b2])
>         (fragment, opr-1, opr-2) = ([], [b3], [])
>         (fragment, opr-1, opr-2) = ([], [], [b3])
>
> The point remains the same that change of ownership for pass-through
> remains valid as it doesn't inflate resource allocation for a given time
> snapshot.
>
>
> On Sat, Apr 28, 2018 at 12:42 AM, salim achouche <sa...@gmail.com>
> wrote:
>
>> Another point, I don't see a functional benefit from avoiding a change of
>> ownership for pass-through operators. Consider the following use-cases:
>>
>> Example I -
>> - Single batch of size 8MB is received at time t0 and then is passed
>> through a set of pass-through operators
>> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
>> forth
>> - Assume we report memory usage at time t0 - t2; this is what will be seen
>> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
>> - t1: (fragment, opr-1, opr-2) = (0, 8MB, 0)
>> - t2: (fragment, opr-1, opr-2) = (0, 0, 8MB)
>>
>> Example II -
>> - Multiple batches of size 8MB are received at time t0 - t2 and then is
>> passed through a set of pass-through operators
>> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
>> forth
>> - Assume we report memory usage at time t0 - t2; this is what will be seen
>> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
>> - t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0)
>> - t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB)
>>
>>
>> The key thing is that we clarify our reporting metrics so that users do
>> not make the wrong conclusions.
>>
>> Regards,
>> Salim
>>
>> On Fri, Apr 27, 2018 at 11:47 PM, salim achouche <sa...@gmail.com>
>> wrote:
>>
>>> Vlad,
>>>
>>> - My understanding is that operators need to take ownership of incoming
>>> buffers (using
>>>
>>> the vector method transferTo())
>>>
>>> - My view is not that receivers are pass-through; instead, I feel that
>>> sender & receiver operators should focus on their business logic
>>>
>>> - It just happens that the unordered-receiver does very little
>>> (deserializes the batch through the BatchLoader)
>>>
>>> - Contrast this with the merge-receiver which needs to consume data from
>>> multiple inputs to provide ordered batches
>>>
>>> - The operator implementation will dictate how many batches are consumed
>>> (this should have nothing to do with communication concerns)
>>>
>>> - Intricacies of buffering, acking, back-pressuring, etc is ideally left
>>> to a communication module
>>>
>>>
>>> My intent, is to consistently report on resource usage (I am fine if we
>>> exclude pass-through operators as long as we do it consistently). The next
>>>
>>> enhancement that I am planning to do is to report on the fragment
>>> buffered batches. This will enable us to account for such resources when
>>> analyzing
>>>
>>> memory usage.
>>>
>>> On Fri, Apr 27, 2018 at 9:50 PM, vrozov <gi...@git.apache.org> wrote:
>>>
>>>> Github user vrozov commented on the issue:
>>>>
>>>>      https://github.com/apache/drill/pull/1237
>>>>
>>>>      IMO, it will be good to understand what other operators do as well.
>>>> For example what Project or Filter operators do. Do they take ownership of
>>>> incoming batches? And if they do, when is the ownership taken?
>>>>
>>>>      I do not suggest that we change how Sender and Receiver control
>>>> **all** aspects of communication, at least not as part of this JIRA/PR. The
>>>> difference in my and your approach is whether or not UnorderedReceiver and
>>>> other receivers are pass-through operators. My view is that receivers are
>>>> not pass-through operators and they are buffering operators as they receive
>>>> batches from the network and buffer them before downstream operators are
>>>> ready to consume those batches. In your view, receivers are pass-through
>>>> operators that get batches from fragment queue or some other queue and pass
>>>> them to downstream. As there is no wait and no processing between getting a
>>>> batch from fragment queue and passing it to the next operator, I don't see
>>>> why a receiver needs to take the ownership.
>>>>
>>>>
>>>> ---
>>>>
>>>


Re: [GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by salim achouche <sa...@gmail.com>.
Correction for example II as Drill uses a single thread per pipeline (a
batch is fully processed before the next one is; only receive of batches
can happen concurrently):
- Using batch identifiers for more clarity
- t0: (fragment, opr-1, opr-2) = ([b1], [], [])
- t1: (fragment, opr-1, opr-2) = ([b2], [b1], [])
- t2: (fragment, opr-1, opr-2) = ([b3,b2], [], [b1])
       (fragment, opr-1, opr-2) = ([b3], [b2], [])
       (fragment, opr-1, opr-2) = ([b3], [], [b2])
       (fragment, opr-1, opr-2) = ([], [b3], [])
       (fragment, opr-1, opr-2) = ([], [], [b3])

The point remains the same that change of ownership for pass-through
remains valid as it doesn't inflate resource allocation for a given time
snapshot.


On Sat, Apr 28, 2018 at 12:42 AM, salim achouche <sa...@gmail.com>
wrote:

> Another point, I don't see a functional benefit from avoiding a change of
> ownership for pass-through operators. Consider the following use-cases:
>
> Example I -
> - Single batch of size 8MB is received at time t0 and then is passed
> through a set of pass-through operators
> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
> forth
> - Assume we report memory usage at time t0 - t2; this is what will be seen
> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
> - t1: (fragment, opr-1, opr-2) = (0, 8MB, 0)
> - t2: (fragment, opr-1, opr-2) = (0, 0, 8MB)
>
> Example II -
> - Multiple batches of size 8MB are received at time t0 - t2 and then is
> passed through a set of pass-through operators
> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
> forth
> - Assume we report memory usage at time t0 - t2; this is what will be seen
> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
> - t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0)
> - t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB)
>
>
> The key thing is that we clarify our reporting metrics so that users do
> not make the wrong conclusions.
>
> Regards,
> Salim
>
> On Fri, Apr 27, 2018 at 11:47 PM, salim achouche <sa...@gmail.com>
> wrote:
>
>> Vlad,
>>
>> - My understanding is that operators need to take ownership of incoming
>> buffers (using
>>
>> the vector method transferTo())
>>
>> - My view is not that receivers are pass-through; instead, I feel that
>> sender & receiver operators should focus on their business logic
>>
>> - It just happens that the unordered-receiver does very little
>> (deserializes the batch through the BatchLoader)
>>
>> - Contrast this with the merge-receiver which needs to consume data from
>> multiple inputs to provide ordered batches
>>
>> - The operator implementation will dictate how many batches are consumed
>> (this should have nothing to do with communication concerns)
>>
>> - Intricacies of buffering, acking, back-pressuring, etc is ideally left
>> to a communication module
>>
>>
>> My intent, is to consistently report on resource usage (I am fine if we
>> exclude pass-through operators as long as we do it consistently). The next
>>
>> enhancement that I am planning to do is to report on the fragment
>> buffered batches. This will enable us to account for such resources when
>> analyzing
>>
>> memory usage.
>>
>> On Fri, Apr 27, 2018 at 9:50 PM, vrozov <gi...@git.apache.org> wrote:
>>
>>> Github user vrozov commented on the issue:
>>>
>>>     https://github.com/apache/drill/pull/1237
>>>
>>>     IMO, it will be good to understand what other operators do as well.
>>> For example what Project or Filter operators do. Do they take ownership of
>>> incoming batches? And if they do, when is the ownership taken?
>>>
>>>     I do not suggest that we change how Sender and Receiver control
>>> **all** aspects of communication, at least not as part of this JIRA/PR. The
>>> difference in my and your approach is whether or not UnorderedReceiver and
>>> other receivers are pass-through operators. My view is that receivers are
>>> not pass-through operators and they are buffering operators as they receive
>>> batches from the network and buffer them before downstream operators are
>>> ready to consume those batches. In your view, receivers are pass-through
>>> operators that get batches from fragment queue or some other queue and pass
>>> them to downstream. As there is no wait and no processing between getting a
>>> batch from fragment queue and passing it to the next operator, I don't see
>>> why a receiver needs to take the ownership.
>>>
>>>
>>> ---
>>>
>>
>>
>

Re: [DISCUSS] batch ownership

Posted by Paul Rogers <pa...@yahoo.com.INVALID>.
I missed this as a discussion since it had the title of a GitHub discussion. Comments below.

    On Friday, April 27, 2018, 5:42:37 PM PDT, salim achouche <sa...@gmail.com> wrote:  
 
 > Another point, I don't see a functional benefit from avoiding a change of
ownership for pass-through operators. 

Please read my responses to Vlad. Change of ownership is critical to how Drill's memory allocators work today. Of course, you are right that, if we could do a new design (perhaps based on the budget-based approach), we would not need the ownership stuff. But, without ownership changes now, the existing allocators will simply cause us all manner of problems. In particular, none of the spill logic added to Sort or HashAgg would work as they rely on a properly-functioning allocator.

> Consider the following use-cases:

Example I -
- Single batch of size 8MB is received at time t0 and then is passed
through a set of pass-through operators
- At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
forth
- Assume we report memory usage at time t0 - t2; this is what will be seen
- t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
- t1: (fragment, opr-1, opr-2) = (0, 8MB, 0)
- t2: (fragment, opr-1, opr-2) = (0, 0, 8MB)

You are right. Each minor fragment is single-threaded: only one operator is "active" at a time as control passes from downstream to upstream operators. (Yes, this is the unfortunate Drill terminology: downstream calls upstream, data flows in the direction opposite to calls.)
This single-threaded model is the insight behind the budget-based memory model. But, to get there, we must consider the whole system, we can't just make localized changes, unfortunately.

> Example II -
- Multiple batches of size 8MB are received at time t0 - t2 and then is
passed through a set of pass-through operators
- At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
forth
- Assume we report memory usage at time t0 - t2; this is what will be seen
- t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
- t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0)
- t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB)

The above can, AFAIK, never happen. A batch is owned by an operator, not a fragment. A batch passes up the operator tree until it reaches the top or until it reaches a "buffering" operator such as Sort.


> The key thing is that we clarify our reporting metrics so that users do not
make the wrong conclusions.

This is a good thing. But, we need to understand how the batches flow and report that accurately. Further, we must deeply understand this flow if we want to move to budget-based allocation without per-operator allocators.

Let's separate various concepts. First is the instantaneous "stats" maintained by each operator allocator to enforce memory limits. Second is the total data that has passed through an operator. Third is the maximum memory used at any one time over the life of the operator.

These are all very useful, but they measure different things.

Thanks,
- Paul

  

Re: [GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by salim achouche <sa...@gmail.com>.
Another point, I don't see a functional benefit from avoiding a change of
ownership for pass-through operators. Consider the following use-cases:

Example I -
- Single batch of size 8MB is received at time t0 and then is passed
through a set of pass-through operators
- At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
forth
- Assume we report memory usage at time t0 - t2; this is what will be seen
- t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
- t1: (fragment, opr-1, opr-2) = (0, 8MB, 0)
- t2: (fragment, opr-1, opr-2) = (0, 0, 8MB)

Example II -
- Multiple batches of size 8MB are received at time t0 - t2 and then is
passed through a set of pass-through operators
- At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
forth
- Assume we report memory usage at time t0 - t2; this is what will be seen
- t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
- t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0)
- t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB)


The key thing is that we clarify our reporting metrics so that users do not
make the wrong conclusions.

Regards,
Salim

On Fri, Apr 27, 2018 at 11:47 PM, salim achouche <sa...@gmail.com>
wrote:

> Vlad,
>
> - My understanding is that operators need to take ownership of incoming
> buffers (using
>
> the vector method transferTo())
>
> - My view is not that receivers are pass-through; instead, I feel that
> sender & receiver operators should focus on their business logic
>
> - It just happens that the unordered-receiver does very little
> (deserializes the batch through the BatchLoader)
>
> - Contrast this with the merge-receiver which needs to consume data from
> multiple inputs to provide ordered batches
>
> - The operator implementation will dictate how many batches are consumed
> (this should have nothing to do with communication concerns)
>
> - Intricacies of buffering, acking, back-pressuring, etc is ideally left
> to a communication module
>
>
> My intent, is to consistently report on resource usage (I am fine if we
> exclude pass-through operators as long as we do it consistently). The next
>
> enhancement that I am planning to do is to report on the fragment buffered
> batches. This will enable us to account for such resources when analyzing
>
> memory usage.
>
> On Fri, Apr 27, 2018 at 9:50 PM, vrozov <gi...@git.apache.org> wrote:
>
>> Github user vrozov commented on the issue:
>>
>>     https://github.com/apache/drill/pull/1237
>>
>>     IMO, it will be good to understand what other operators do as well.
>> For example what Project or Filter operators do. Do they take ownership of
>> incoming batches? And if they do, when is the ownership taken?
>>
>>     I do not suggest that we change how Sender and Receiver control
>> **all** aspects of communication, at least not as part of this JIRA/PR. The
>> difference in my and your approach is whether or not UnorderedReceiver and
>> other receivers are pass-through operators. My view is that receivers are
>> not pass-through operators and they are buffering operators as they receive
>> batches from the network and buffer them before downstream operators are
>> ready to consume those batches. In your view, receivers are pass-through
>> operators that get batches from fragment queue or some other queue and pass
>> them to downstream. As there is no wait and no processing between getting a
>> batch from fragment queue and passing it to the next operator, I don't see
>> why a receiver needs to take the ownership.
>>
>>
>> ---
>>
>
>

Re: [GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by salim achouche <sa...@gmail.com>.
Vlad,

- My understanding is that operators need to take ownership of incoming
buffers (using

the vector method transferTo())

- My view is not that receivers are pass-through; instead, I feel that
sender & receiver operators should focus on their business logic

- It just happens that the unordered-receiver does very little
(deserializes the batch through the BatchLoader)

- Contrast this with the merge-receiver which needs to consume data from
multiple inputs to provide ordered batches

- The operator implementation will dictate how many batches are consumed
(this should have nothing to do with communication concerns)

- Intricacies of buffering, acking, back-pressuring, etc is ideally left to
a communication module


My intent, is to consistently report on resource usage (I am fine if we
exclude pass-through operators as long as we do it consistently). The next

enhancement that I am planning to do is to report on the fragment buffered
batches. This will enable us to account for such resources when analyzing

memory usage.

On Fri, Apr 27, 2018 at 9:50 PM, vrozov <gi...@git.apache.org> wrote:

> Github user vrozov commented on the issue:
>
>     https://github.com/apache/drill/pull/1237
>
>     IMO, it will be good to understand what other operators do as well.
> For example what Project or Filter operators do. Do they take ownership of
> incoming batches? And if they do, when is the ownership taken?
>
>     I do not suggest that we change how Sender and Receiver control
> **all** aspects of communication, at least not as part of this JIRA/PR. The
> difference in my and your approach is whether or not UnorderedReceiver and
> other receivers are pass-through operators. My view is that receivers are
> not pass-through operators and they are buffering operators as they receive
> batches from the network and buffer them before downstream operators are
> ready to consume those batches. In your view, receivers are pass-through
> operators that get batches from fragment queue or some other queue and pass
> them to downstream. As there is no wait and no processing between getting a
> batch from fragment queue and passing it to the next operator, I don't see
> why a receiver needs to take the ownership.
>
>
> ---
>

[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1237
  
    IMO, it will be good to understand what other operators do as well. For example what Project or Filter operators do. Do they take ownership of incoming batches? And if they do, when is the ownership taken?
    
    I do not suggest that we change how Sender and Receiver control **all** aspects of communication, at least not as part of this JIRA/PR. The difference in my and your approach is whether or not UnorderedReceiver and other receivers are pass-through operators. My view is that receivers are not pass-through operators and they are buffering operators as they receive batches from the network and buffer them before downstream operators are ready to consume those batches. In your view, receivers are pass-through operators that get batches from fragment queue or some other queue and pass them to downstream. As there is no wait and no processing between getting a batch from fragment queue and passing it to the next operator, I don't see why a receiver needs to take the ownership. 


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184197379
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException {
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    +
    +    RawFragmentBatch batch = null;
         try{
    -      RawFragmentBatch batch;
    +
    --- End diff --
    
    will do. Thanks for the suggestion.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184050305
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -182,13 +184,18 @@ public IterOutcome next() {
             return IterOutcome.OUT_OF_MEMORY;
           }
     
    +      // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting
    +      batch = batch.transferBodyOwnership(oContext.getAllocator());
    +
           final RecordBatchDef rbd = batch.getHeader().getDef();
           final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
           // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
           // SchemaChangeException, so check/clean catch clause below.
           stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
     
           batch.release();
    +      batch = null;
    --- End diff --
    
    Can it be now handled in `finally`?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184804819
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    --- End diff --
    
    it may throw `AssertException` now and other exceptions may be added in the future.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184070218
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    +
    +    // Set the index and increment reference count
    +    transferResult.buffer.writerIndex(writerIndex);
    +
    +    // Clear the current Drillbuffer since caller will perform release() on the new one
    +    body.release();
    +
    +    return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false);
    --- End diff --
    
    Why is it necessary to return new `RawFragmentBatch` instead of setting `body` to `transferResult.buffer`?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r183886561
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -182,13 +184,18 @@ public IterOutcome next() {
             return IterOutcome.OUT_OF_MEMORY;
           }
     
    +      // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting
    +      batch = batch.transferBodyOwnership(oContext.getAllocator());
    --- End diff --
    
    @sachouche  Thank you for the changes. I was just contemplating if the below change
    batch = batch.transferBodyOwnership(oContext.getAllocator()); 
    needs to be moved before the check at line number 183.



---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184154997
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    +
    +    // Set the index and increment reference count
    +    transferResult.buffer.writerIndex(writerIndex);
    +
    +    // Clear the current Drillbuffer since caller will perform release() on the new one
    +    body.release();
    +
    +    return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false);
    --- End diff --
    
    I don't see where RawFragmentBatch is cached. Is not it removed from a queue using poll()?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184049558
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -201,6 +208,11 @@ public IterOutcome next() {
           context.getExecutorState().fail(ex);
           return IterOutcome.STOP;
         } finally {
    +
    +      if (batch != null) {
    +        batch.release();
    +        batch = null;
    --- End diff --
    
    Why is this necessary?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184138876
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    +
    +    // Set the index and increment reference count
    +    transferResult.buffer.writerIndex(writerIndex);
    +
    +    // Clear the current Drillbuffer since caller will perform release() on the new one
    +    body.release();
    +
    +    return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false);
    --- End diff --
    
    That was my original code which failed miserably:
    - The RPC code has references on the DrillBuf and the RawFragmentBatch
    - This means, we need to ensure that a release call is routed to the right DrillBuf (otherwise, the reference count logic stops working)
    - Creating a new RawFragmentBatch instance essentially provided just that (proper reference count accounting)


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184114148
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException {
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    +
    +    RawFragmentBatch batch = null;
         try{
    -      RawFragmentBatch batch;
    +
    --- End diff --
    
    Can you clarify your ask?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184075517
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    --- End diff --
    
    There is no check that the transfer is within new allocator limits, is it guaranteed to succeed?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184159218
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException {
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    +
    +    RawFragmentBatch batch = null;
         try{
    -      RawFragmentBatch batch;
    +
    --- End diff --
    
    create function `getNextNotEmptyBatch()` that calls `getNextBatch()` and either returns not empty batch or `null`.


---

[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on the issue:

    https://github.com/apache/drill/pull/1237
  
    That was not my intention as my current change aimed at describing the system the way it is. 
    
    @parthchandra, any feedback?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184112400
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException {
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    +
    +    RawFragmentBatch batch = null;
    --- End diff --
    
    The release logic is a NOOP if the body is null; the while loop is to guard against an empty batch.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r185063074
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,47 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * <li>Caller is responsible for checking an OOM condition
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    --- End diff --
    
    DrillBuf has a transferOwnership method that could be used instead? See the transferTo methods in any of the vector classes -  [FixedValueVectors.transferTo(....)](https://github.com/apache/drill/blob/master/exec/vector/src/main/codegen/templates/FixedValueVectors.java#L282) 
    Saves you the trouble of rolling your own and also avoids the need to make changes to any of the existing classes.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184574405
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    +      batch = getNextBatch();
    +
    +      // skip over empty batches. we do this since these are basically control messages.
    +      while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
    --- End diff --
    
    Sure. 
    
    FYI - We cannot release the batch within this method.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184151186
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -182,13 +184,18 @@ public IterOutcome next() {
             return IterOutcome.OUT_OF_MEMORY;
           }
     
    +      // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting
    +      batch = batch.transferBodyOwnership(oContext.getAllocator());
    +
           final RecordBatchDef rbd = batch.getHeader().getDef();
           final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
           // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
           // SchemaChangeException, so check/clean catch clause below.
           stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
     
           batch.release();
    +      batch = null;
    --- End diff --
    
    @vrozov 
    
    My bad, the highlight was on the "batch = null" code; I guess, you meant why can't we move the whole release logic within the finally block. I agree with your proposal as delaying a bit the release phase doesn't hurt u in this case. I will make the change.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184155724
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    --- End diff --
    
    But what if it is an over limit?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184197278
  
    --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java ---
    @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) {
               target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
             }
     
    -        boolean overlimit = target.allocator.forceAllocate(size);
    +        // Release first to handle the case where the current and target allocators were part of the same
    +        // parent / child tree.
             allocator.releaseBytes(size);
    +        boolean allocationFit = target.allocator.forceAllocate(size);
    --- End diff --
    
    What about debugging:
    - An operation fails
    - And yet there is a change of ownership
     
    @vrozov, I rather not make this change as I strongly believe the change of ownership should happen only on success.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184554299
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    --- End diff --
    
    Consider handling `IOException` inside `getNextNotEmptyBatch()`. 


---

[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1237
  
    @sachouche I'd suggest moving the discussion to dev list as the topic of the batch ownership is beyond PR review (code changes).


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184117938
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -182,13 +184,18 @@ public IterOutcome next() {
             return IterOutcome.OUT_OF_MEMORY;
           }
     
    +      // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting
    +      batch = batch.transferBodyOwnership(oContext.getAllocator());
    +
           final RecordBatchDef rbd = batch.getHeader().getDef();
           final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
           // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
           // SchemaChangeException, so check/clean catch clause below.
           stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
     
           batch.release();
    +      batch = null;
    --- End diff --
    
    Not sure what you mean but this is the goal of the current code:
    - After a batch is properly set, we need to decrease the ref count by one by the end of the next() method
    - If an exception happens before the release call, then the finally block will be able to release the batch since it will be different than null
    - Otherwise, the release will be performed and the batch set to null which will disable the release within the finally block 


---

[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on the issue:

    https://github.com/apache/drill/pull/1237
  
    @vrozov,
    
    **What are we trying to solve / improve**
    - Drill is currently not properly reporting memory held in Fragment's receive queues
    - This makes it hard to analyze OOM conditions
    This is what I want to see
    - Every operator reporting on the resources it is currently using (needed)
    - Fragment held resources (other than the ones already reported by the child operators)
    - Drilbit level (metadata caches, web-server, ..)
    - I am ok to incrementally reach this goal
    
    **Data Exchange Logistic**
    - Ideally, the data exchange fabric should be decoupled from the Drill Receive / Send operators
    - The fabric should be handling all the aspects of pre-fetch / pressuring and so forth
    - It will tune to the speed of producers / consumers when writing / reading data from it
    - This infrastructure should have its own resource management and reporting capabilities
    
    **Operator based Reporting**
    - Receive and Send operators shall not worry about batches they didn't consume yet
    - Doing so is counter productive as the Data Exchange fabric will interpret a "drain" operation as the operator "needing" more data. 
    - For example, the merge-receiver should not be managing the receive queues; it should only advertise the pattern of data consumption and let the exchange fabric figure out the rest. 
    
    The main difference in the two approaches, is that essentially, you are preaching for Receive and Send operators to control all aspects of communication whereas I am preaching for decoupling such aspects.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184747702
  
    --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java ---
    @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) {
               target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
             }
     
    -        boolean overlimit = target.allocator.forceAllocate(size);
    +        // Release first to handle the case where the current and target allocators were part of the same
    +        // parent / child tree.
             allocator.releaseBytes(size);
    +        boolean allocationFit = target.allocator.forceAllocate(size);
    --- End diff --
    
    - The change of order is an optimization for a parent / child relationship as if we don't release first, then we could unnecessarily go over the memory budget (double counting).
    - The force-alloc() / free() failures should never happen on normal conditions; when they do, the best thing to do is to exit. I still prefer not to promote the target allocator till it is 100% successful.
    



---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184575686
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    +      batch = getNextBatch();
    +
    +      // skip over empty batches. we do this since these are basically control messages.
    +      while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
    +          && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
    +        batch = getNextBatch();
    +      }
    +    } finally {
    +      stats.stopWait();
    +    }
    +    return batch;
    +  }
    +
       @Override
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    -    try{
    -      RawFragmentBatch batch;
    -      try {
    -        stats.startWait();
    -        batch = getNextBatch();
     
    -        // skip over empty batches. we do this since these are basically control messages.
    -        while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
    -            && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
    -          batch = getNextBatch();
    -        }
    -      } finally {
    -        stats.stopWait();
    -      }
    +    RawFragmentBatch batch = null;
    +    try {
     
    +      batch = getNextNotEmptyBatch();
           first = false;
     
           if (batch == null) {
    --- End diff --
    
    Sure.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184156922
  
    --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java ---
    @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) {
               target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
             }
     
    -        boolean overlimit = target.allocator.forceAllocate(size);
    +        // Release first to handle the case where the current and target allocators were part of the same
    +        // parent / child tree.
             allocator.releaseBytes(size);
    +        boolean allocationFit = target.allocator.forceAllocate(size);
    --- End diff --
    
    If this happens, is not there a problem that the old allocator already released the memory? In any case, won't runtime exception cancel the query anyway and all allocators will be closed.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184586222
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    --- End diff --
    
    No, there is no need to handle `IOException` twice. There are no other methods that throw `IOException`. `SchemaChangeException` is actually never thrown. Please see *TODO* comment and DRILL-2933.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184554436
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    --- End diff --
    
    Move outside of try/finally.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184596895
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    +
    +    // Set the index and increment reference count
    +    transferResult.buffer.writerIndex(writerIndex);
    +
    +    // Clear the current Drillbuffer since caller will perform release() on the new one
    +    body.release();
    +
    +    return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false);
    --- End diff --
    
    This actually brings a question why `newRawFragmentBatch` is released in `IncomingBuffers.batchArrived()` instead of releasing `transferredBuffer` after `RawFragmentBatch` is constructed in `newRawFragmentBatch`.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184573646
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    --- End diff --
    
    Sure, but does it really matter or is it your personal preference?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184559429
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -201,6 +208,11 @@ public IterOutcome next() {
           context.getExecutorState().fail(ex);
           return IterOutcome.STOP;
         } finally {
    +
    +      if (batch != null) {
    +        batch.release();
    +        batch = null;
    --- End diff --
    
    As far as I can see setting `batch` to `null` has no impact here.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184726839
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -201,6 +208,11 @@ public IterOutcome next() {
           context.getExecutorState().fail(ex);
           return IterOutcome.STOP;
         } finally {
    +
    +      if (batch != null) {
    +        batch.release();
    +        batch = null;
    --- End diff --
    
    The point of this pattern is that if you would like to continue using this object then be prepared to know what can and what cannot be used.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184585775
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    --- End diff --
    
    Yes, it does matter. If something goes wrong during `startWait()` there should be no call to `stopWait()`.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184195748
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    --- End diff --
    
    - After a call to getNext(), the operator code checks whether the transfer has succeeded
    - So that an OOM condition is reported within the fragment
    - When the change of ownership happens from parent to child, there should not be any OOM condition since this is internal account
    
    For code clarity, I can improve the javadoc to ask the caller to check for an OOM condition (similarly to the BaseAlocator behavior).


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184585959
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    +      batch = getNextBatch();
    +
    +      // skip over empty batches. we do this since these are basically control messages.
    +      while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
    --- End diff --
    
    Please explain why it can't be released. Note that the release is done for the batch that will *not* be returned from the method.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184572864
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    --- End diff --
    
    why is that? this method is used once and doing so will make us duplicate exception handling code. I would appreciate if you provide reason(s) when you make suggestions to avoid the back and forth.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184084128
  
    --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java ---
    @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) {
               target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
             }
     
    -        boolean overlimit = target.allocator.forceAllocate(size);
    +        // Release first to handle the case where the current and target allocators were part of the same
    +        // parent / child tree.
             allocator.releaseBytes(size);
    +        boolean allocationFit = target.allocator.forceAllocate(size);
    --- End diff --
    
    move `owningLedger = target;` before `return target.allocator.forceAllocate(size)` as there is no check for the result of the call anyway.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184807153
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    --- End diff --
    
    I see; I will then fix any such occurrences when opportunity presents itself as I have seen both patterns in the Drill code base.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184559236
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    +      batch = getNextBatch();
    +
    +      // skip over empty batches. we do this since these are basically control messages.
    +      while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
    +          && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
    +        batch = getNextBatch();
    +      }
    +    } finally {
    +      stats.stopWait();
    +    }
    +    return batch;
    +  }
    +
       @Override
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    -    try{
    -      RawFragmentBatch batch;
    -      try {
    -        stats.startWait();
    -        batch = getNextBatch();
     
    -        // skip over empty batches. we do this since these are basically control messages.
    -        while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
    -            && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
    -          batch = getNextBatch();
    -        }
    -      } finally {
    -        stats.stopWait();
    -      }
    +    RawFragmentBatch batch = null;
    +    try {
     
    +      batch = getNextNotEmptyBatch();
           first = false;
     
           if (batch == null) {
    --- End diff --
    
    I think that `batchLoader.clear()` is not required as it is part of `close()`. Consider changing the return to a ternary operator. 


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184140733
  
    --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java ---
    @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) {
               target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
             }
     
    -        boolean overlimit = target.allocator.forceAllocate(size);
    +        // Release first to handle the case where the current and target allocators were part of the same
    +        // parent / child tree.
             allocator.releaseBytes(size);
    +        boolean allocationFit = target.allocator.forceAllocate(size);
    --- End diff --
    
    What if a runtime exception is thrown during forceAllocate(...)?  


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184590436
  
    --- Diff: exec/memory/base/src/main/java/org/apache/drill/exec/memory/AllocationManager.java ---
    @@ -253,10 +261,12 @@ public boolean transferBalance(final BufferLedger target) {
               target.historicalLog.recordEvent("incoming(from %s)", owningLedger.allocator.name);
             }
     
    -        boolean overlimit = target.allocator.forceAllocate(size);
    +        // Release first to handle the case where the current and target allocators were part of the same
    +        // parent / child tree.
             allocator.releaseBytes(size);
    +        boolean allocationFit = target.allocator.forceAllocate(size);
    --- End diff --
    
    In this case, changing the order of `forceAllocate()` and `releaseBytes()` is incorrect as ownership is not changed, but the old owner does not account for that memory anymore.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r183898352
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -182,13 +184,18 @@ public IterOutcome next() {
             return IterOutcome.OUT_OF_MEMORY;
           }
     
    +      // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting
    +      batch = batch.transferBodyOwnership(oContext.getAllocator());
    --- End diff --
    
    @HanumathRao,
    
    - This will not affect the outcome since the unordered-receiver is a child of the context allocator (the minor fragment)
    - Doing the check earlier seemed cleaner as there is no point of changing ownership when there is already an out-of-memory condition
    - I think the code was written this way since implicitly, the network handlers are receiving the batch and then performing a change of ownership (from RPC to context allocator); this step could lead to an out-of-memory condition at the fragment level
    - If your point is about reporting, that is attributing the OOM condition to the child operator, then I guess I don't have a problem with that.
    @parthchandra what do you think?
     


---

[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on the issue:

    https://github.com/apache/drill/pull/1237
  
    @vrozov, your observation is valid, we need more JIRAs to fix the reporting problem
    **Current Fix**
    - At this time, the UnorderedReceiver didn't account for any consumed memory
    - This fix, taxes the operator only when it consumes buffers
    
    **Potential Enhancements**
    Solution I
    - Create a new fragment child Allocator which will own the received (not yet consumed) batches
    - Improve the UI to report this allocator size
    - This solution is simple and complementary to the current work
    
    Solution II
    - Have the receiver operator drain the batch queue
    - Essentially, the receiver will have a private queue from where to consume batches
    - I personally don't like this solution as it prevents us from improving the Drill network protocol
    - Draining the batches for the sake of reporting will make it harder for the network layer to prefetch batches in an optimal manner; the queue size should be an indicator on how many pending batches there are. 


---

[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on the issue:

    https://github.com/apache/drill/pull/1237
  
    @vrozov, I have implemented the provided suggestions.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184068798
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException {
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    +
    +    RawFragmentBatch batch = null;
    --- End diff --
    
    Is it necessary to release batch when it is overridden on line 167?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184146733
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -182,13 +184,18 @@ public IterOutcome next() {
             return IterOutcome.OUT_OF_MEMORY;
           }
     
    +      // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting
    +      batch = batch.transferBodyOwnership(oContext.getAllocator());
    +
           final RecordBatchDef rbd = batch.getHeader().getDef();
           final boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
           // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
           // SchemaChangeException, so check/clean catch clause below.
           stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
     
           batch.release();
    +      batch = null;
    --- End diff --
    
    Why can't it be done in finally?


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r185064842
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -182,13 +189,15 @@ public IterOutcome next() {
             return IterOutcome.OUT_OF_MEMORY;
           }
     
    +      // Transfer the ownership of this raw-batch to this operator for proper memory statistics reporting
    +      batch = batch.transferBodyOwnership(oContext.getAllocator());
    --- End diff --
    
    This should probably be done inside the [ RecordBatchLoader.load() ](https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java#L78) method.
    Note that `MergingRecordBatch` has similar code and so probably suffers from the same memory accounting issue. All other uses of `RecordBatchLoader.load()` appear to be in the client code or test code so we are unlikely to break anything by making the change in `RecordBatchLoader`.



---

[GitHub] drill issue #1237: DRILL-6348: Fixed code so that Unordered Receiver reports...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1237
  
    IMO, it is better not to report memory usage at all compared to reporting a wrong number. In case incoming batches are accumulated in a queue, they should be reported as owned by a receiver. Taking ownership just before passing a batch to the next operator does not sound right to me.
    
    I don't think it is necessary to create new fragment child allocator. Receiver allocator should be used instead of fragment allocator when an incoming batch is placed into a queue.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184558425
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    +      batch = getNextBatch();
    +
    +      // skip over empty batches. we do this since these are basically control messages.
    +      while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
    --- End diff --
    
    Consider reverting the condition, something like 
    ```
          while (true) {
            RawFragmentBatch batch = getNextBatch();
            if (batch == null) {
              break;
            }
            RecordBatchDef recordBatchDef = batch.getHeader().getDef();
            if (recordBatchDef.getRecordCount() > 0 || (first && recordBatchDef.getFieldCount() > 0)) {
              return batch;
            }
            batch.release();
          }
    ```


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184085544
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException {
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    +
    +    RawFragmentBatch batch = null;
         try{
    -      RawFragmentBatch batch;
    +
    --- End diff --
    
    Consider moving try block to a separate function.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184574622
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -201,6 +208,11 @@ public IterOutcome next() {
           context.getExecutorState().fail(ex);
           return IterOutcome.STOP;
         } finally {
    +
    +      if (batch != null) {
    +        batch.release();
    +        batch = null;
    --- End diff --
    
    This pattern ensures that after release the batch cannot be accessed anymore (e.g., additions to the current code).


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184589826
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -201,6 +208,11 @@ public IterOutcome next() {
           context.getExecutorState().fail(ex);
           return IterOutcome.STOP;
         } finally {
    +
    +      if (batch != null) {
    +        batch.release();
    +        batch = null;
    --- End diff --
    
    OK, but note that additions to the current code can also delete `batch = null` assignment and that `batch` can be used after `release()` call. For example, with the current implementation, `batch.getHeader()` is perfectly valid after the batch was released.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184119110
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    --- End diff --
    
    yes, it is. 


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184727914
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -149,25 +149,32 @@ private RawFragmentBatch getNextBatch() throws IOException {
         }
       }
     
    +  private RawFragmentBatch getNextNotEmptyBatch() throws IOException {
    +    RawFragmentBatch batch;
    +    try {
    +      stats.startWait();
    --- End diff --
    
    Ok good point, as I have seen both practices being done within the Drill code. Though, I don't think this is a big deal as I don't see startWait() failing as it merely invokes nano time.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184192630
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    +
    +    // Set the index and increment reference count
    +    transferResult.buffer.writerIndex(writerIndex);
    +
    +    // Clear the current Drillbuffer since caller will perform release() on the new one
    +    body.release();
    +
    +    return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false);
    --- End diff --
    
    Look at the IncomingBuffers.batchArrived:
    - It holds a reference to the RawFragmentBatch
    - Holds a lock on the parent collector for insertion
    - Though, the collector enqueue method itself is not synchronized on the "this" object
    - Instead, it relies on the synchronization provided as part of the RawBatchBuffer queue
    - This means a getNext() call will be able to dequeue a RawFragmentBatch instance
    - Concurrently, the IncomingBuffers.batchArrived will invoke release on the cached reference


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184150526
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---
    @@ -153,8 +153,10 @@ private RawFragmentBatch getNextBatch() throws IOException {
       public IterOutcome next() {
         batchLoader.resetRecordCount();
         stats.startProcessing();
    +
    +    RawFragmentBatch batch = null;
    --- End diff --
    
    It is not obvious that `body` is `null` when the `if` condition is met. Additionally, what if a release logic changes in future? It is a bad practice to rely on the details of implementation.


---

[GitHub] drill pull request #1237: DRILL-6348: Fixed code so that Unordered Receiver ...

Posted by sachouche <gi...@git.apache.org>.
Github user sachouche commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1237#discussion_r184730050
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---
    @@ -77,4 +83,46 @@ public long getByteCount() {
       public boolean isAckSent() {
         return ackSent.get();
       }
    +
    +  /**
    +   * Transfer ownership of this DrillBuf to the target allocator. This is done for better memory
    +   * accounting (that is, the operator should be charged with the body's Drillbuf memory).
    +   *
    +   * <p><b>NOTES -</b>
    +   * <ul>
    +   * <li>This operation is a NOOP when a) the current allocator (associated with the DrillBuf) is not the
    +   *     owning allocator or b) the target allocator is already the owner
    +   * <li>When transfer happens, a new RawFragmentBatch instance is allocated; this is done for proper
    +   *     DrillBuf reference count accounting
    +   * <li>The RPC handling code caches a reference to this RawFragmentBatch object instance; release()
    +   *     calls should be routed to the previous DrillBuf
    +   * </ul>
    +   *
    +   * @param targetAllocator target allocator
    +   * @return a new {@link RawFragmentBatch} object instance on success (where the buffer ownership has
    +   *         been switched to the target allocator); otherwise this operation is a NOOP (current instance
    +   *         returned)
    +   */
    +  public RawFragmentBatch transferBodyOwnership(BufferAllocator targetAllocator) {
    +    if (body == null) {
    +      return this; // NOOP
    +    }
    +
    +    if (!body.getLedger().isOwningLedger()
    +     || body.getLedger().isOwner(targetAllocator)) {
    +
    +      return this;
    +    }
    +
    +    int writerIndex               = body.writerIndex();
    +    TransferResult transferResult = body.transferOwnership(targetAllocator);
    +
    +    // Set the index and increment reference count
    +    transferResult.buffer.writerIndex(writerIndex);
    +
    +    // Clear the current Drillbuffer since caller will perform release() on the new one
    +    body.release();
    +
    +    return new RawFragmentBatch(getHeader(), transferResult.buffer, getSender(), false);
    --- End diff --
    
    We can take up such an enhancement as as part of another JIRA as any changes within the RPC layer have to be thoroughly tested.


---