You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/07/24 09:37:51 UTC
svn commit: r1692461 - in /hama/trunk: CHANGES.txt
core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
core/src/main/java/org/apache/hama/ipc/AsyncServer.java
Author: edwardyoon
Date: Fri Jul 24 07:37:51 2015
New Revision: 1692461
URL: http://svn.apache.org/r1692461
Log:
HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1692461&r1=1692460&r2=1692461&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jul 24 07:37:51 2015
@@ -7,6 +7,7 @@ Release 0.7.1 (unreleased changes)
BUG FIXES
HAMA-965: Infinite loop because of recursive function call (JongYoon Lim via edwardyoon)
+ HAMA-966: NioServerListener doesn't throw any exceptions (JongYoon Lim via edwardyoon)
IMPROVEMENTS
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1692461&r1=1692460&r2=1692461&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java Fri Jul 24 07:37:51 2015
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -100,12 +101,20 @@ public final class HamaAsyncMessageManag
server.start();
LOG.info("BSPPeer address:" + server.getAddress().getHostName()
+ " port:" + server.getAddress().getPort());
- } catch (BindException e) {
- LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1);
- if (retry++ >= MAX_RETRY) {
- throw new RuntimeException("RPC Server could not be launched!");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ if (e.getCause() instanceof BindException) {
+ final int nextPort = port + 1;
+ LOG.warn("Address already in use. Retrying " + hostName + ":"
+ + nextPort);
+ if (retry++ >= MAX_RETRY) {
+ throw new RuntimeException("RPC Server could not be launched!");
+ }
+ startServer(hostName, nextPort);
}
- startServer(hostName, port + 1);
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java?rev=1692461&r1=1692460&r2=1692461&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java Fri Jul 24 07:37:51 2015
@@ -19,34 +19,13 @@ package org.apache.hama.ipc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.ReferenceCountUtil;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+import io.netty.util.concurrent.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -60,6 +39,17 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.Future;
+
/**
* An abstract IPC service using netty. IPC calls take a single {@link Writable}
* as a parameter, and return a {@link Writable}*
@@ -171,54 +161,62 @@ public abstract class AsyncServer {
}
/** start server listener */
- public void start() {
- new NioServerListener().start();
+ public void start() throws ExecutionException, InterruptedException {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ Future<ChannelFuture> future = es.submit(new NioServerListener());
+ try {
+ ChannelFuture closeFuture = future.get();
+ closeFuture
+ .addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
+ @Override
+ public void operationComplete(
+ io.netty.util.concurrent.Future<Void> voidFuture)
+ throws Exception {
+ // Stop the server gracefully if it's not terminated.
+ stop();
+ }
+ });
+ } finally {
+ es.shutdown();
+ }
}
- private class NioServerListener extends Thread {
+ private class NioServerListener implements Callable<ChannelFuture> {
/**
* Configure and start nio server
*/
@Override
- public void run() {
+ public ChannelFuture call() throws Exception {
SERVER.set(AsyncServer.this);
- try {
- // ServerBootstrap is a helper class that sets up a server
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, backlogLength)
- .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
- .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024)
- .childOption(ChannelOption.RCVBUF_ALLOCATOR,
- new FixedRecvByteBufAllocator(100 * 1024))
-
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
- // Register accumulation processing handler
- p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0));
- // Register message processing handler
- p.addLast(new NioServerInboundHandler());
- }
- });
-
- // Bind and start to accept incoming connections.
- ChannelFuture f = b.bind(port).sync();
- LOG.info("AsyncServer startup");
- // Wait until the server socket is closed.
- f.channel().closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // Shut down Server gracefully
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
+ // ServerBootstrap is a helper class that sets up a server
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, backlogLength)
+ .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT)
+ .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024)
+ .childOption(ChannelOption.RCVBUF_ALLOCATOR,
+ new FixedRecvByteBufAllocator(100 * 1024))
+
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ // Register accumulation processing handler
+ p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0));
+ // Register message processing handler
+ p.addLast(new NioServerInboundHandler());
+ }
+ });
+
+ // Bind and start to accept incoming connections.
+ ChannelFuture f = b.bind(port).sync();
+ LOG.info("AsyncServer startup");
+
+ return f.channel().closeFuture();
}
}