You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/02/23 19:01:07 UTC

git commit: TEZ-874. Fetcher inputstream is not buffered. (Rajesh Balamohan via hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 1c112c103 -> 52cfa784d


TEZ-874. Fetcher inputstream is not buffered. (Rajesh Balamohan via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/52cfa784
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/52cfa784
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/52cfa784

Branch: refs/heads/master
Commit: 52cfa784ddf5db0e696f4f145b3537eddfaca079
Parents: 1c112c1
Author: Hitesh Shah <hi...@apache.org>
Authored: Sun Feb 23 09:59:28 2014 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Sun Feb 23 09:59:28 2014 -0800

----------------------------------------------------------------------
 tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java | 7 +++++++
 .../tez/runtime/library/common/shuffle/impl/Fetcher.java      | 7 ++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52cfa784/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index 444a830..a3d6561 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -217,6 +217,13 @@ public class TezJobConfig {
   public static final String TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT = "tez.runtime.shuffle.read.timeout";
   public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT = 
       3 * 60 * 1000;
+  
+  /**
+   * 
+   */
+  public static final String TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE = "tez.runtime.shuffle.buffersize";
+  public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE = 
+      8 * 1024;
 
   /**
    * 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/52cfa784/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index e8038a4..042e8f5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -78,6 +79,7 @@ class Fetcher extends Thread {
   
   private final int connectionTimeout;
   private final int readTimeout;
+  private final int bufferSize;
   
   // Decompression of map-outputs
   private final CompressionCodec codec;
@@ -137,6 +139,9 @@ class Fetcher extends Thread {
     this.readTimeout = 
         job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 
             TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    
+    this.bufferSize = job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
 
     setName("fetcher#" + id);
     setDaemon(true);
@@ -262,7 +267,7 @@ class Fetcher extends Thread {
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       connect(connection, connectionTimeout);
       connectSucceeded = true;
-      input = new DataInputStream(connection.getInputStream());
+      input = new DataInputStream(new BufferedInputStream(connection.getInputStream(), bufferSize));
 
       // Validate response code
       int rc = connection.getResponseCode();