You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/07/24 09:37:51 UTC

svn commit: r1692461 - in /hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java core/src/main/java/org/apache/hama/ipc/AsyncServer.java

Author: edwardyoon
Date: Fri Jul 24 07:37:51 2015
New Revision: 1692461

URL: http://svn.apache.org/r1692461
Log:
HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1692461&r1=1692460&r2=1692461&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jul 24 07:37:51 2015
@@ -7,6 +7,7 @@ Release 0.7.1 (unreleased changes)
   BUG FIXES
 
     HAMA-965: Infinite loop because of recursive function call (JongYoon Lim via edwardyoon)
+    HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)
 
   IMPROVEMENTS
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1692461&r1=1692460&r2=1692461&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java Fri Jul 24 07:37:51 2015
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -100,12 +101,20 @@ public final class HamaAsyncMessageManag
       server.start();
       LOG.info("BSPPeer address:" + server.getAddress().getHostName()
           + " port:" + server.getAddress().getPort());
-    } catch (BindException e) {
-      LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1);
-      if (retry++ >= MAX_RETRY) {
-        throw new RuntimeException("RPC Server could not be launched!");
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+      if (e.getCause() instanceof BindException) {
+        final int nextPort = port + 1;
+        LOG.warn("Address already in use. Retrying " + hostName + ":"
+            + nextPort);
+        if (retry++ >= MAX_RETRY) {
+          throw new RuntimeException("RPC Server could not be launched!");
+        }
+        startServer(hostName, nextPort);
       }
-      startServer(hostName, port + 1);
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java?rev=1692461&r1=1692460&r2=1692461&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java Fri Jul 24 07:37:51 2015
@@ -19,34 +19,13 @@ package org.apache.hama.ipc;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.*;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.ReferenceCountUtil;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+import io.netty.util.concurrent.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -60,6 +39,17 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.Future;
+
 /**
  * An abstract IPC service using netty. IPC calls take a single {@link Writable}
  * as a parameter, and return a {@link Writable}*
@@ -171,54 +161,62 @@ public abstract class AsyncServer {
   }
 
   /** start server listener */
-  public void start() {
-    new NioServerListener().start();
+  public void start() throws ExecutionException, InterruptedException {
+    ExecutorService es = Executors.newSingleThreadExecutor();
+    Future<ChannelFuture> future = es.submit(new NioServerListener());
+    try {
+      ChannelFuture closeFuture = future.get();
+      closeFuture
+          .addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
+            @Override
+            public void operationComplete(
+                io.netty.util.concurrent.Future<Void> voidFuture)
+                throws Exception {
+              // Stop the server gracefully if it's not terminated.
+              stop();
+            }
+          });
+    } finally {
+      es.shutdown();
+    }
   }
 
-  private class NioServerListener extends Thread {
+  private class NioServerListener implements Callable<ChannelFuture> {
 
     /**
      * Configure and start nio server
      */
     @Override
-    public void run() {
+    public ChannelFuture call() throws Exception {
       SERVER.set(AsyncServer.this);
-      try {
-        // ServerBootstrap is a helper class that sets up a server
-        ServerBootstrap b = new ServerBootstrap();
-        b.group(bossGroup, workerGroup)
-            .channel(NioServerSocketChannel.class)
-            .option(ChannelOption.SO_BACKLOG, backlogLength)
-            .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
-            .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
-            .childOption(ChannelOption.SO_KEEPALIVE, true)
-            .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024)
-            .childOption(ChannelOption.RCVBUF_ALLOCATOR,
-                new FixedRecvByteBufAllocator(100 * 1024))
-
-            .childHandler(new ChannelInitializer<SocketChannel>() {
-              @Override
-              public void initChannel(SocketChannel ch) throws Exception {
-                ChannelPipeline p = ch.pipeline();
-                // Register accumulation processing handler
-                p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0));
-                // Register message processing handler
-                p.addLast(new NioServerInboundHandler());
-              }
-            });
-
-        // Bind and start to accept incoming connections.
-        ChannelFuture f = b.bind(port).sync();
-        LOG.info("AsyncServer startup");
-        // Wait until the server socket is closed.
-        f.channel().closeFuture().sync();
-      } catch (Exception e) {
-        e.printStackTrace();
-      } finally {
-        // Shut down Server gracefully
-        bossGroup.shutdownGracefully();
-        workerGroup.shutdownGracefully();
-      }
+      // ServerBootstrap is a helper class that sets up a server
+      ServerBootstrap b = new ServerBootstrap();
+      b.group(bossGroup, workerGroup)
+          .channel(NioServerSocketChannel.class)
+          .option(ChannelOption.SO_BACKLOG, backlogLength)
+          .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
+          .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+          .childOption(ChannelOption.SO_KEEPALIVE, true)
+          .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024)
+          .childOption(ChannelOption.RCVBUF_ALLOCATOR,
+              new FixedRecvByteBufAllocator(100 * 1024))
+
+          .childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            public void initChannel(SocketChannel ch) throws Exception {
+              ChannelPipeline p = ch.pipeline();
+              // Register accumulation processing handler
+              p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0));
+              // Register message processing handler
+              p.addLast(new NioServerInboundHandler());
+            }
+          });
+
+      // Bind and start to accept incoming connections.
+      ChannelFuture f = b.bind(port).sync();
+      LOG.info("AsyncServer startup");
+
+      return f.channel().closeFuture();
     }
   }