You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/31 23:31:44 UTC

[1/5] git commit: upgrade Netty from 4.0.0.Beta2 to 4.0.13.Final

Updated Branches:
  refs/heads/master 55b7e2fdf -> 63b411dd8


upgrade Netty from 4.0.0.Beta2 to 4.0.13.Final


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/040dd3ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/040dd3ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/040dd3ec

Branch: refs/heads/master
Commit: 040dd3ecd5a7668ecbb6cf0b611318c1010f9bdb
Parents: 3bf7c70
Author: Binh Nguyen <ng...@gmail.com>
Authored: Sat Dec 7 00:44:10 2013 -0800
Committer: Binh Nguyen <ng...@gmail.com>
Committed: Tue Dec 24 14:58:18 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/network/netty/FileClient.java  | 15 ++++++----
 .../netty/FileClientChannelInitializer.java     |  5 ++--
 .../spark/network/netty/FileClientHandler.java  | 12 ++------
 .../apache/spark/network/netty/FileServer.java  | 29 ++++++++++++++------
 .../spark/network/netty/FileServerHandler.java  |  8 +++---
 pom.xml                                         |  2 +-
 project/SparkBuild.scala                        |  2 +-
 7 files changed, 42 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/040dd3ec/core/src/main/java/org/apache/spark/network/netty/FileClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index edd0fc5..95e25d9 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.oio.OioEventLoopGroup;
 import io.netty.channel.socket.oio.OioSocketChannel;
 
+import io.netty.util.concurrent.EventExecutorGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +33,7 @@ class FileClient {
   private FileClientHandler handler = null;
   private Channel channel = null;
   private Bootstrap bootstrap = null;
+  private EventLoopGroup group = null;
   private int connectTimeout = 60*1000; // 1 min
 
   public FileClient(FileClientHandler handler, int connectTimeout) {
@@ -40,8 +42,9 @@ class FileClient {
   }
 
   public void init() {
-    bootstrap = new Bootstrap();
-    bootstrap.group(new OioEventLoopGroup())
+    group = new OioEventLoopGroup();
+    Bootstrap bootstrap = new Bootstrap();
+    bootstrap.group(group)
       .channel(OioSocketChannel.class)
       .option(ChannelOption.SO_KEEPALIVE, true)
       .option(ChannelOption.TCP_NODELAY, true)
@@ -76,11 +79,13 @@ class FileClient {
 
   public void close() {
     if(channel != null) {
-      channel.close();
+      channel.close().awaitUninterruptibly();
       channel = null;
     }
-    if ( bootstrap!=null) {
-      bootstrap.shutdown();
+
+    if (group!=null) {
+      group.shutdownGracefully();
+      group = null;
       bootstrap = null;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/040dd3ec/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
index 65ee15d..b162250 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.netty;
 
-import io.netty.buffer.BufType;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
@@ -25,7 +24,7 @@ import io.netty.handler.codec.string.StringEncoder;
 
 class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
 
-  private FileClientHandler fhandler;
+  private final FileClientHandler fhandler;
 
   public FileClientChannelInitializer(FileClientHandler handler) {
     fhandler = handler;
@@ -35,7 +34,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
   public void initChannel(SocketChannel channel) {
     // file no more than 2G
     channel.pipeline()
-      .addLast("encoder", new StringEncoder(BufType.BYTE))
+      .addLast("encoder", new StringEncoder())
       .addLast("handler", fhandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/040dd3ec/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
index 8a09210..63d3d92 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java
@@ -19,11 +19,11 @@ package org.apache.spark.network.netty;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundByteHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
 
 import org.apache.spark.storage.BlockId;
 
-abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
+abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
   private FileHeader currentHeader = null;
 
@@ -37,13 +37,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
   public abstract void handleError(BlockId blockId);
 
   @Override
-  public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
-    // Use direct buffer if possible.
-    return ctx.alloc().ioBuffer();
-  }
-
-  @Override
-  public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
+  public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
     // get header
     if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
       currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/040dd3ec/core/src/main/java/org/apache/spark/network/netty/FileServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index a99af34..a7305cd 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -22,6 +22,9 @@ import java.net.InetSocketAddress;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoop;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import io.netty.channel.socket.oio.OioServerSocketChannel;
 
@@ -36,7 +39,8 @@ class FileServer {
 
   private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
 
-  private ServerBootstrap bootstrap = null;
+  private EventLoopGroup bossGroup = null;
+  private EventLoopGroup workerGroup = null;
   private ChannelFuture channelFuture = null;
   private int port = 0;
   private Thread blockingThread = null;
@@ -45,8 +49,11 @@ class FileServer {
     InetSocketAddress addr = new InetSocketAddress(port);
 
     // Configure the server.
-    bootstrap = new ServerBootstrap();
-    bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
+    bossGroup = new OioEventLoopGroup();
+    workerGroup = new OioEventLoopGroup();
+
+    ServerBootstrap bootstrap = new ServerBootstrap();
+    bootstrap.group(bossGroup, workerGroup)
         .channel(OioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .option(ChannelOption.SO_RCVBUF, 1500)
@@ -89,13 +96,19 @@ class FileServer {
   public void stop() {
     // Close the bound channel.
     if (channelFuture != null) {
-      channelFuture.channel().close();
+      channelFuture.channel().close().awaitUninterruptibly();
       channelFuture = null;
     }
-    // Shutdown bootstrap.
-    if (bootstrap != null) {
-      bootstrap.shutdown();
-      bootstrap = null;
+
+    // Shutdown event groups
+    if (bossGroup != null) {
+       bossGroup.shutdownGracefully();
+       bossGroup = null;
+    }
+
+    if (workerGroup != null) {
+       workerGroup.shutdownGracefully();
+       workerGroup = null;
     }
     // TODO: Shutdown all accepted channels as well ?
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/040dd3ec/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index 172c6e4..f3009b4 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -21,13 +21,13 @@ import java.io.File;
 import java.io.FileInputStream;
 
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.DefaultFileRegion;
 
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.storage.FileSegment;
 
-class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
+class FileServerHandler extends SimpleChannelInboundHandler<String> {
 
   PathResolver pResolver;
 
@@ -36,7 +36,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
   }
 
   @Override
-  public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
+  public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
     BlockId blockId = BlockId.apply(blockIdString);
     FileSegment fileSegment = pResolver.getBlockLocation(blockId);
     // if getBlockLocation returns null, close the channel
@@ -60,7 +60,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
       int len = new Long(length).intValue();
       ctx.write((new FileHeader(len, blockId)).buffer());
       try {
-        ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
+        ctx.write(new DefaultFileRegion(new FileInputStream(file)
           .getChannel(), fileSegment.offset(), fileSegment.length()));
       } catch (Exception e) {
         e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/040dd3ec/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 57e8435..0936ae5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -282,7 +282,7 @@
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
-        <version>4.0.0.CR1</version>
+        <version>4.0.13.Final</version>
       </dependency>
       <dependency>
         <groupId>org.apache.derby</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/040dd3ec/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 7bcbd90..1df1abc 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -178,7 +178,7 @@ object SparkBuild extends Build {
 
 
     libraryDependencies ++= Seq(
-        "io.netty"          % "netty-all"       % "4.0.0.CR1",
+        "io.netty"          % "netty-all"       % "4.0.13.Final",
         "org.eclipse.jetty" % "jetty-server"    % "7.6.8.v20121106",
         /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
         "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),


[2/5] git commit: Fix imports order

Posted by pw...@apache.org.
Fix imports order


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/786f393a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/786f393a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/786f393a

Branch: refs/heads/master
Commit: 786f393a98f8771d0c20322cd50e553a895c7d60
Parents: 9115a5d
Author: Binh Nguyen <ng...@gmail.com>
Authored: Sat Dec 7 11:06:18 2013 -0800
Committer: Binh Nguyen <ng...@gmail.com>
Committed: Tue Dec 24 14:59:30 2013 -0800

----------------------------------------------------------------------
 .../spark/network/netty/FileClientChannelInitializer.java       | 1 -
 .../main/java/org/apache/spark/network/netty/FileServer.java    | 5 ++---
 .../spark/network/netty/FileServerChannelInitializer.java       | 1 -
 3 files changed, 2 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/786f393a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
index b162250..fb61be1 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java
@@ -21,7 +21,6 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 
-
 class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
 
   private final FileClientHandler fhandler;

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/786f393a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index 3fe7d69..aea7534 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.network.netty;
 
+import java.net.InetSocketAddress;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
@@ -26,9 +28,6 @@ import io.netty.channel.socket.oio.OioServerSocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-
-
 /**
  * Server that accept the path of a file an echo back its content.
  */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/786f393a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
index 833af16..f1f264c 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
@@ -23,7 +23,6 @@ import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import io.netty.handler.codec.Delimiters;
 import io.netty.handler.codec.string.StringDecoder;
 
-
 class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
 
   PathResolver pResolver;


[3/5] git commit: Remove import * and fix some formatting

Posted by pw...@apache.org.
Remove import * and fix some formatting


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9115a5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9115a5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9115a5de

Branch: refs/heads/master
Commit: 9115a5de62dcb832569727773112a4688ef63f03
Parents: 040dd3e
Author: Binh Nguyen <ng...@gmail.com>
Authored: Sat Dec 7 09:15:31 2013 -0800
Committer: Binh Nguyen <ng...@gmail.com>
Committed: Tue Dec 24 14:59:30 2013 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/spark/network/netty/FileClient.java  | 4 ++--
 .../main/java/org/apache/spark/network/netty/FileServer.java  | 7 ++-----
 2 files changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9115a5de/core/src/main/java/org/apache/spark/network/netty/FileClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 95e25d9..6b7f6a9 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -20,10 +20,10 @@ package org.apache.spark.network.netty;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import io.netty.channel.socket.oio.OioSocketChannel;
 
-import io.netty.util.concurrent.EventExecutorGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +83,7 @@ class FileClient {
       channel = null;
     }
 
-    if (group!=null) {
+    if (group != null) {
       group.shutdownGracefully();
       group = null;
       bootstrap = null;

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9115a5de/core/src/main/java/org/apache/spark/network/netty/FileServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
index a7305cd..3fe7d69 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java
@@ -17,20 +17,17 @@
 
 package org.apache.spark.network.netty;
 
-import java.net.InetSocketAddress;
-
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoop;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.oio.OioEventLoopGroup;
 import io.netty.channel.socket.oio.OioServerSocketChannel;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+
 
 /**
  * Server that accept the path of a file an echo back its content.


[5/5] git commit: Merge pull request #238 from ngbinh/upgradeNetty

Posted by pw...@apache.org.
Merge pull request #238 from ngbinh/upgradeNetty

upgrade Netty from 4.0.0.Beta2 to 4.0.13.Final

the changes are listed at https://github.com/netty/netty/wiki/New-and-noteworthy


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/63b411dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/63b411dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/63b411dd

Branch: refs/heads/master
Commit: 63b411dd8664c27ac55586d8345733afad80961f
Parents: 55b7e2f 2c5bade
Author: Patrick Wendell <pw...@gmail.com>
Authored: Tue Dec 31 14:31:28 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Dec 31 14:31:28 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/network/netty/FileClient.java  | 32 ++++++++++++++------
 .../netty/FileClientChannelInitializer.java     |  6 ++--
 .../spark/network/netty/FileClientHandler.java  | 12 ++------
 .../apache/spark/network/netty/FileServer.java  | 29 ++++++++++++------
 .../netty/FileServerChannelInitializer.java     |  3 +-
 .../spark/network/netty/FileServerHandler.java  | 18 ++++++-----
 pom.xml                                         |  2 +-
 project/SparkBuild.scala                        |  2 +-
 8 files changed, 60 insertions(+), 44 deletions(-)
----------------------------------------------------------------------



[4/5] git commit: Fix failed unit tests

Posted by pw...@apache.org.
Fix failed unit tests

    Also clean up a bit.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2c5bade4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2c5bade4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2c5bade4

Branch: refs/heads/master
Commit: 2c5bade4ee6db747cbc7b0884094ad443834e3b1
Parents: 786f393
Author: Binh Nguyen <ng...@gmail.com>
Authored: Fri Dec 27 11:18:27 2013 -0800
Committer: Binh Nguyen <ng...@gmail.com>
Committed: Fri Dec 27 11:24:30 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/network/netty/FileClient.java  | 25 +++++++++++++-------
 .../netty/FileServerChannelInitializer.java     |  2 +-
 .../spark/network/netty/FileServerHandler.java  | 10 +++++---
 3 files changed, 24 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2c5bade4/core/src/main/java/org/apache/spark/network/netty/FileClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
index 6b7f6a9..46d6150 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java
@@ -27,14 +27,17 @@ import io.netty.channel.socket.oio.OioSocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.TimeUnit;
+
 class FileClient {
 
   private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
-  private FileClientHandler handler = null;
+  private final FileClientHandler handler;
   private Channel channel = null;
   private Bootstrap bootstrap = null;
   private EventLoopGroup group = null;
-  private int connectTimeout = 60*1000; // 1 min
+  private final int connectTimeout;
+  private final int sendTimeout = 60; // 1 min
 
   public FileClient(FileClientHandler handler, int connectTimeout) {
     this.handler = handler;
@@ -43,7 +46,7 @@ class FileClient {
 
   public void init() {
     group = new OioEventLoopGroup();
-    Bootstrap bootstrap = new Bootstrap();
+    bootstrap = new Bootstrap();
     bootstrap.group(group)
       .channel(OioSocketChannel.class)
       .option(ChannelOption.SO_KEEPALIVE, true)
@@ -59,6 +62,7 @@ class FileClient {
       // ChannelFuture cf = channel.closeFuture();
       //cf.addListener(new ChannelCloseListener(this));
     } catch (InterruptedException e) {
+      LOG.warn("FileClient interrupted while trying to connect", e);
       close();
     }
   }
@@ -74,15 +78,18 @@ class FileClient {
   public void sendRequest(String file) {
     //assert(file == null);
     //assert(channel == null);
-    channel.write(file + "\r\n");
+      try {
+          // Should be able to send the message to network link channel.
+          boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
+          if (!bSent) {
+              throw new RuntimeException("Failed to send");
+          }
+      } catch (InterruptedException e) {
+          LOG.error("Error", e);
+      }
   }
 
   public void close() {
-    if(channel != null) {
-      channel.close().awaitUninterruptibly();
-      channel = null;
-    }
-
     if (group != null) {
       group.shutdownGracefully();
       group = null;

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2c5bade4/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
index f1f264c..3f15ff8 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java
@@ -35,7 +35,7 @@ class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
   public void initChannel(SocketChannel channel) {
     channel.pipeline()
       .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
-      .addLast("strDecoder", new StringDecoder())
+      .addLast("stringDecoder", new StringDecoder())
       .addLast("handler", new FileServerHandler(pResolver));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2c5bade4/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
index f3009b4..e2d9391 100644
--- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
+++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java
@@ -26,10 +26,14 @@ import io.netty.channel.DefaultFileRegion;
 
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.storage.FileSegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class FileServerHandler extends SimpleChannelInboundHandler<String> {
 
-  PathResolver pResolver;
+  private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+
+  private final PathResolver pResolver;
 
   public FileServerHandler(PathResolver pResolver){
     this.pResolver = pResolver;
@@ -63,7 +67,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
         ctx.write(new DefaultFileRegion(new FileInputStream(file)
           .getChannel(), fileSegment.offset(), fileSegment.length()));
       } catch (Exception e) {
-        e.printStackTrace();
+          LOG.error("Exception: ", e);
       }
     } else {
       ctx.write(new FileHeader(0, blockId).buffer());
@@ -73,7 +77,7 @@ class FileServerHandler extends SimpleChannelInboundHandler<String> {
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    cause.printStackTrace();
+    LOG.error("Exception: ", cause);
     ctx.close();
   }
 }