You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tez.apache.org by "Hitesh Shah (JIRA)" <ji...@apache.org> on 2013/08/19 20:59:48 UTC

[jira] [Comment Edited] (TEZ-376) Reduce Processor can parallelize I/O/Processor initialization

    [ https://issues.apache.org/jira/browse/TEZ-376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13744113#comment-13744113 ] 

Hitesh Shah edited comment on TEZ-376 at 8/19/13 6:59 PM:
----------------------------------------------------------

Looks fine. If possible, can we reduce the code duplication: 

{code}
+    FutureTask<Void> initInputFuture = new FutureTask<Void>(new InitInputCallable(input));
+    FutureTask<Void> initOutputFuture = new FutureTask<Void>(new InitOutputCallable(out));
+    new Thread(initInputFuture, "InitInputThread").start();
+    new Thread(initOutputFuture, "InitOututThread").start();

...

+
+    // Wait for input/output to be initialized before starting processing.
+    LOG.info("Waiting on input initialization");
+    waitForIOInitialization(initInputFuture);
+    sortPhase = getProgress().addPhase("sort");
+    reducePhase = getProgress().addPhase("reduce");
+    sortPhase.complete(); // sort is complete
+    setPhase(TezTaskStatus.Phase.REDUCE);

{code}

Above code exists in both old/new api code paths.
                
      was (Author: hitesh):
    Looks fine. If possible, can we reduce the code duplication: 

{code}
+    FutureTask<Void> initInputFuture = new FutureTask<Void>(new InitInputCallable(input));
+    FutureTask<Void> initOutputFuture = new FutureTask<Void>(new InitOutputCallable(out));
+    new Thread(initInputFuture, "InitInputThread").start();
+    new Thread(initOutputFuture, "InitOututThread").start();

...

+
+    // Wait for input/output to be initialized before starting processing.
+    LOG.info("Waiting on input initialization");
+    waitForIOInitialization(initInputFuture);
+    sortPhase = getProgress().addPhase("sort");
+    reducePhase = getProgress().addPhase("reduce");
+    sortPhase.complete(); // sort is complete
+    setPhase(TezTaskStatus.Phase.REDUCE);

{code}
                  
> Reduce Processor can parallelize I/O/Processor initialization
> -------------------------------------------------------------
>
>                 Key: TEZ-376
>                 URL: https://issues.apache.org/jira/browse/TEZ-376
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Siddharth Seth
>            Assignee: Siddharth Seth
>         Attachments: TEZ-376.txt
>
>
> Reported by [~gopalv]. Reduce initialization can take a fair amount of time after shuffle is complete - depending on how expensive the Reduce initialization is.
> Quote from Gopal's investigation
> 1.6 seconds is spent in ReflectionUtils.newInstance() -> ReflectionUtils.setConf() of which 1.2 seconds is Utilities.getMapRedWork().

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira