You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Sungwoo (Jira)" <ji...@apache.org> on 2020/08/07 16:26:00 UTC

[jira] [Comment Edited] (TEZ-4075) Tez: Reimplement tez.runtime.transfer.data-via-events.enabled

    [ https://issues.apache.org/jira/browse/TEZ-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173259#comment-17173259 ] 

Sungwoo edited comment on TEZ-4075 at 8/7/20, 4:25 PM:
-------------------------------------------------------

I would like to submit a patch for reducing lock contention in ShuffleManager, and have trouble with figuring out what to do with the field *completedInputSet* in ShuffleManager. The final patch in this Jira uses this code (which was removed in TEZ-2196):

{code:java}
    if (!completedInputSet.get(inputIdentifier)) {
      synchronized (completedInputSet) {
        if (!completedInputSet.get(inputIdentifier)) {
{code}

In the rest of the code of ShuffleManager, however, *completedInputSet* is not guarded with *synchronized (completedInputSet)*, and it looks like it is guarded with *lock.lock()*.

Could someone explain the intended use of *lock.lock()* in ShuffleManager? From my analysis, it looks like *lock.lock()* is used to guard:

{code:java}
final BitSet completedInputSet;
private final BlockingQueue<InputHost> pendingHosts;
private long lastProgressTime;
private long totalBytesShuffledTillNow;
{code}

(*pendingHosts* is thread-safe, but the comment in the code says that it should be guarded with *lock.lock()*.)




was (Author: glapark):
I would like to submit a patch for reducing lock contention in ShuffleManager, and have trouble with figuring out what to do with the field *completedInputSet* in ShuffleManager. The final patch in this Jira uses this code (which was removed in TEZ-2196):

{code:java}
    if (!completedInputSet.get(inputIdentifier)) {
      synchronized (completedInputSet) {
        if (!completedInputSet.get(inputIdentifier)) {
{code}

In the rest of the code of ShuffleManager, however, *completedInputSet* is not guarded with *synchronized (completedInputSet)*, and it looks like it is guarded with *lock.lock()*.

Could someone explain the intended use of *lock.lock()* in ShuffleManager? From my analysis, it looks like *lock.lock* is used to guard:

{code:java}
final BitSet completedInputSet;
private final BlockingQueue<InputHost> pendingHosts;
private long lastProgressTime;
private long totalBytesShuffledTillNow;
{code}

(*pendingHosts* is thread-safe, but the comment in the code says that it should be guarded with *lock.lock()*.)



> Tez: Reimplement tez.runtime.transfer.data-via-events.enabled
> -------------------------------------------------------------
>
>                 Key: TEZ-4075
>                 URL: https://issues.apache.org/jira/browse/TEZ-4075
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Gopal Vijayaraghavan
>            Assignee: Richard Zhang
>            Priority: Major
>             Fix For: 0.10.1
>
>         Attachments: TEZ-4075.10.patch, TEZ-4075.15.patch, TEZ-4075.16.patch, TEZ-4075.enable-dme.16.patch, Tez-4075.5.patch, Tez-4075.8.patch
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> This was factored out by TEZ-2196, which does skip buffers for 1-partition data exchanges (therefore goes to disk directly).
> {code}
>     if (shufflePayload.hasData()) {	    shuffleManager.addKnownInput(shufflePayload.getHost(),
>       DataProto dataProto = shufflePayload.getData();	        shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
>       FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(),	
>           dataProto.getCompressedLength(), srcAttemptIdentifier);	
>       moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);	
>       shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);	
>     } else {	
>       shuffleManager.addKnownInput(shufflePayload.getHost(),	
>           shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);	
>     }	
> {code}
> got removed in 
> https://github.com/apache/tez/commit/1ba1f927c16a1d7c273b6cd1a8553e5269d1541a
> It would be better to buffer up the 512Byte limit for the event size before writing to disk, since creating a new file always incurs disk traffic, even if the file is eventually being served out of the buffer cache.
> The total overhead of receiving an event, then firing an HTTP call to fetch the data etc adds approx 100-150ms to a query - the data xfer through the event will skip the disk entirely for this & also remove the extra IOPS incurred.
> This channel is not suitable for large-scale event transport, but specifically the workload here deals with 1-row control tables which consume more bandwidth with HTTP headers and hostnames than the 93 byte payload.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)