You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by bs...@apache.org on 2014/09/30 02:44:15 UTC
svn commit: r1628344 [1/2] - in /hama/trunk/core: ./
src/main/java/org/apache/hama/bsp/message/ src/main/java/org/apache/hama/ipc/
src/test/java/org/apache/hama/bsp/message/ src/test/java/org/apache/hama/ipc/
Author: bsmin
Date: Tue Sep 30 00:44:14 2014
New Revision: 1628344
URL: http://svn.apache.org/r1628344
Log:
HAMA-913: add RPC implementation using Netty
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (with props)
hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java (with props)
hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java (with props)
hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java (with props)
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java (with props)
hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java (with props)
hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java (with props)
Modified:
hama/trunk/core/pom.xml
Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1628344&r1=1628343&r2=1628344&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Tue Sep 30 00:44:14 2014
@@ -139,6 +139,11 @@
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.21.Final</version>
+ </dependency>
</dependencies>
<build>
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java Tue Sep 30 00:44:14 2014
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.ipc.AsyncRPC;
+import org.apache.hama.ipc.AsyncServer;
+import org.apache.hama.util.LRUCache;
+
+/**
+ * Implementation of the {@link HamaMessageManager}.
+ *
+ */
+public final class HamaAsyncMessageManagerImpl<M extends Writable> extends
+ AbstractMessageManager<M> implements HamaMessageManager<M> {
+
+ private static final Log LOG = LogFactory
+ .getLog(HamaAsyncMessageManagerImpl.class);
+
+ private AsyncServer server;
+
+ private LRUCache<InetSocketAddress, HamaMessageManager<M>> peersLRUCache = null;
+
+ @SuppressWarnings("serial")
+ @Override
+ public final void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
+ HamaConfiguration conf, InetSocketAddress peerAddress) {
+ super.init(attemptId, peer, conf, peerAddress);
+ startRPCServer(conf, peerAddress);
+ peersLRUCache = new LRUCache<InetSocketAddress, HamaMessageManager<M>>(
+ maxCachedConnections) {
+ @Override
+ protected final boolean removeEldestEntry(
+ Map.Entry<InetSocketAddress, HamaMessageManager<M>> eldest) {
+ if (size() > this.capacity) {
+ HamaMessageManager<M> proxy = eldest.getValue();
+ AsyncRPC.stopProxy(proxy);
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ private final void startRPCServer(Configuration conf,
+ InetSocketAddress peerAddress) {
+ try {
+ startServer(peerAddress.getHostName(), peerAddress.getPort());
+ } catch (IOException ioe) {
+ LOG.error("Fail to start RPC server!", ioe);
+ throw new RuntimeException("RPC Server could not be launched!");
+ }
+ }
+
+ private void startServer(String hostName, int port) throws IOException {
+ int retry = 0;
+ try {
+ this.server = AsyncRPC.getServer(this, hostName, port,
+ conf.getInt("hama.default.messenger.handler.threads.num", 5), false,
+ conf);
+
+ server.start();
+ LOG.info("BSPPeer address:" + server.getAddress().getHostName()
+ + " port:" + server.getAddress().getPort());
+ } catch (BindException e) {
+ LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1);
+ startServer(hostName, port + 1);
+ retry++;
+
+ if (retry > 5) {
+ throw new RuntimeException("RPC Server could not be launched!");
+ }
+ }
+ }
+
+ @Override
+ public final void close() {
+ super.close();
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @Override
+ public final void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
+ throws IOException {
+ HamaMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
+ if (bspPeerConnection == null) {
+ throw new IllegalArgumentException("Can not find " + addr.toString()
+ + " to transfer messages to!");
+ } else {
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+ bundle.getLength());
+ bspPeerConnection.put(bundle);
+ }
+ }
+
+ /**
+ * @param addr socket address to which BSP Peer Connection will be established
+ * @return BSP Peer Connection, tried to return cached connection, else
+ * returns a new connection and caches it
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ protected final HamaMessageManager<M> getBSPPeerConnection(
+ InetSocketAddress addr) throws IOException {
+ HamaMessageManager<M> bspPeerConnection;
+ if (!peersLRUCache.containsKey(addr)) {
+ bspPeerConnection = (HamaMessageManager<M>) AsyncRPC.getProxy(
+ HamaMessageManager.class, HamaRPCProtocolVersion.versionID, addr,
+ this.conf);
+ peersLRUCache.put(addr, bspPeerConnection);
+ } else {
+ bspPeerConnection = peersLRUCache.get(addr);
+ }
+ return bspPeerConnection;
+ }
+
+ @Override
+ public final void put(M msg) throws IOException {
+ loopBackMessage(msg);
+ }
+
+ @Override
+ public final void put(BSPMessageBundle<M> bundle) throws IOException {
+ loopBackBundle(bundle);
+ }
+
+ @Override
+ public final long getProtocolVersion(String arg0, long arg1)
+ throws IOException {
+ return versionID;
+ }
+
+ @Override
+ public InetSocketAddress getListenerAddress() {
+ if (this.server != null) {
+ return this.server.getAddress();
+ }
+ return null;
+ }
+
+}
Propchange: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java Tue Sep 30 00:44:14 2014
@@ -0,0 +1,1138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BSPNetUtils;
+
+/**
+ * A client for an IPC service using netty. IPC calls take a single
+ * {@link Writable} as a parameter, and return a {@link Writable} as their
+ * value. A service runs on a port and is defined by a parameter class and a
+ * value class.
+ *
+ * @see AsyncClient
+ */
+public class AsyncClient {
+ private static final String IPC_CLIENT_CONNECT_MAX_RETRIES_KEY = "ipc.client.connect.max.retries";
+ private static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
+ private static final Log LOG = LogFactory.getLog(AsyncClient.class);
+ private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
+
+ private Class<? extends Writable> valueClass; // class of call values
+ private int counter = 0; // counter for call ids
+ private AtomicBoolean running = new AtomicBoolean(true); // if client runs
+ final private Configuration conf; // configuration obj
+
+ private SocketFactory socketFactory; // only use in order to meet the
+ // consistency with other clients
+ private int refCount = 1;
+
+ final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
+ final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
+
+ /**
+ * set the ping interval value in configuration
+ *
+ * @param conf Configuration
+ * @param pingInterval the ping interval
+ */
+ final public static void setPingInterval(Configuration conf, int pingInterval) {
+ conf.setInt(PING_INTERVAL_NAME, pingInterval);
+ }
+
+ /**
+ * Get the ping interval from configuration; If not set in the configuration,
+ * return the default value.
+ *
+ * @param conf Configuration
+ * @return the ping interval
+ */
+ final static int getPingInterval(Configuration conf) {
+ return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
+ }
+
+ /**
+ * The time after which a RPC will timeout. If ping is not enabled (via
+ * ipc.client.ping), then the timeout value is the same as the pingInterval.
+ * If ping is enabled, then there is no timeout value.
+ *
+ * @param conf Configuration
+ * @return the timeout period in milliseconds. -1 if no timeout value is set
+ */
+ final public static int getTimeout(Configuration conf) {
+ if (!conf.getBoolean("ipc.client.ping", true)) {
+ return getPingInterval(conf);
+ }
+ return -1;
+ }
+
+ /**
+ * Increment this client's reference count
+ *
+ */
+ synchronized void incCount() {
+ refCount++;
+ }
+
+ /**
+ * Decrement this client's reference count
+ *
+ */
+ synchronized void decCount() {
+ refCount--;
+ }
+
+ /**
+ * Return if this client has no reference
+ *
+ * @return true if this client has no reference; false otherwise
+ */
+ synchronized boolean isZeroReference() {
+ return refCount == 0;
+ }
+
+ /**
+ * Thread that reads responses and notifies callers. Each connection owns a
+ * socket connected to a remote address. Calls are multiplexed through this
+ * socket: responses may be delivered out of order.
+ */
+ private class Connection {
+ private InetSocketAddress serverAddress; // server ip:port
+ private ConnectionHeader header; // connection header
+ private final ConnectionId remoteId; // connection id
+ private AuthMethod authMethod; // authentication method
+
+ private EventLoopGroup group;
+ private Bootstrap bootstrap;
+ private Channel channel;
+ private int rpcTimeout;
+ private int maxIdleTime; // connections will be culled if it was idle
+
+ private final RetryPolicy connectionRetryPolicy;
+ private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+ private int pingInterval; // how often sends ping to the server in msecs
+
+ // currently active calls
+ private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
+ private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate
+ private IOException closeException; // if the connection is closed, close
+ // reason
+
+ /**
+ * Setup Connection Configuration
+ *
+ * @param remoteId remote connection Id
+ * @throws IOException
+ */
+ public Connection(ConnectionId remoteId) throws IOException {
+ group = new NioEventLoopGroup();
+ bootstrap = new Bootstrap();
+ this.remoteId = remoteId;
+ this.serverAddress = remoteId.getAddress();
+ if (serverAddress.isUnresolved()) {
+ throw new UnknownHostException("unknown host: "
+ + remoteId.getAddress().getHostName());
+ }
+ this.maxIdleTime = remoteId.getMaxIdleTime();
+ this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
+ this.tcpNoDelay = remoteId.getTcpNoDelay();
+ this.pingInterval = remoteId.getPingInterval();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+ }
+ this.rpcTimeout = remoteId.getRpcTimeout();
+ Class<?> protocol = remoteId.getProtocol();
+
+ authMethod = AuthMethod.SIMPLE;
+ header = new ConnectionHeader(protocol == null ? null
+ : protocol.getName(), null, authMethod);
+ }
+
+ /**
+ * Add a call to this connection's call queue and notify a listener;
+ * synchronized. Returns false if called during shutdown.
+ *
+ * @param call to add
+ * @return true if the call was added.
+ */
+ private synchronized boolean addCall(Call call) {
+ if (shouldCloseConnection.get())
+ return false;
+ calls.put(call.id, call);
+ notify();
+ return true;
+ }
+
+ /**
+ * Update the server address if the address corresponding to the host name
+ * has changed.
+ */
+ private synchronized boolean updateAddress() throws IOException {
+ // Do a fresh lookup with the old host name.
+ InetSocketAddress currentAddr = BSPNetUtils.makeSocketAddr(
+ serverAddress.getHostName(), serverAddress.getPort());
+
+ if (!serverAddress.equals(currentAddr)) {
+ LOG.warn("Address change detected. Old: " + serverAddress.toString()
+ + " New: " + currentAddr.toString());
+ serverAddress = currentAddr;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Connect to the server and set up the I/O streams. It then sends a header
+ * to the server.
+ */
+ private void setupIOstreams() throws InterruptedException {
+ if (channel != null && channel.isActive()) {
+ return;
+ }
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to " + serverAddress);
+ }
+
+ setupConnection();
+ writeHeader();
+ } catch (Throwable t) {
+ if (t instanceof IOException) {
+ markClosed((IOException) t);
+ } else {
+ markClosed(new IOException("Couldn't set up IO streams", t));
+ }
+ close();
+ }
+ }
+
+ /**
+ * Configure the client and connect to server
+ */
+ private void setupConnection() throws Exception {
+ while (true) {
+ short ioFailures = 0;
+ try {
+ // rpcTimeout overwrites pingInterval
+ if (rpcTimeout > 0) {
+ pingInterval = rpcTimeout;
+ }
+
+ // Configure the client.
+ // NioEventLoopGroup is a multithreaded event loop that handles I/O
+ // operation
+ group = new NioEventLoopGroup();
+ // Bootstrap is a helper class that sets up a client
+ bootstrap = new Bootstrap();
+ bootstrap.group(group).channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, this.tcpNoDelay)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, pingInterval)
+ .option(ChannelOption.SO_SNDBUF, 30 * 1024 * 1024)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new IdleStateHandler(0, 0, maxIdleTime));
+ // Register message processing handler
+ p.addLast(new NioClientInboundHandler());
+ }
+ });
+
+ // Bind and start to accept incoming connections.
+ ChannelFuture channelFuture = bootstrap.connect(
+ serverAddress.getAddress(), serverAddress.getPort()).sync();
+ // Get io channel
+ channel = channelFuture.channel();
+ LOG.info("AsyncClient startup");
+ break;
+ } catch (Exception ie) {
+ /*
+ * Check for an address change and update the local reference. Reset
+ * the failure counter if the address was changed
+ */
+
+ if (updateAddress()) {
+ ioFailures = 0;
+ }
+ handleConnectionFailure(ioFailures++, ie);
+ }
+ }
+ }
+
+ /**
+ * Write the header protocol header for each connection Out is not
+ * synchronized because only the first thread does this.
+ *
+ * @param channel
+ */
+ private void writeHeader() {
+ DataOutputBuffer rpcBuff = null;
+ DataOutputBuffer headerBuf = null;
+ try {
+ ByteBuf buf = channel.alloc().buffer();
+ rpcBuff = new DataOutputBuffer();
+ authMethod.write(rpcBuff);
+
+ headerBuf = new DataOutputBuffer();
+ header.write(headerBuf);
+ byte[] data = headerBuf.getData();
+ int dataLength = headerBuf.getLength();
+ // write rpcheader
+ buf.writeInt(AsyncServer.HEADER_LENGTH + dataLength);
+ buf.writeBytes(AsyncServer.HEADER.array());
+ buf.writeByte(AsyncServer.CURRENT_VERSION);
+ buf.writeByte(rpcBuff.getData()[0]);
+ // write header
+ buf.writeInt(dataLength);
+ buf.writeBytes(data, 0, dataLength);
+
+ channel.writeAndFlush(buf);
+ } catch (Exception e) {
+ LOG.error("Couldn't send header" + e);
+ } finally {
+ IOUtils.closeStream(rpcBuff);
+ IOUtils.closeStream(headerBuf);
+ }
+ }
+
+ /**
+ * close the current connection gracefully.
+ */
+ private void closeConnection() {
+ try {
+ if (!this.group.isTerminated()) {
+ this.group.shutdownGracefully();
+ LOG.info("client gracefully shutdown");
+ }
+ } catch (Exception e) {
+ LOG.warn("Not able to close a client", e);
+ }
+ }
+
+ /**
+ * This class process received response message from server.
+ */
+ private class NioClientInboundHandler extends ChannelInboundHandlerAdapter {
+
+ /**
+ * Receive a response. This method is called with the received response
+ * message, whenever new data is received from a server.
+ *
+ * @param ctx
+ * @param cause
+ */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ByteBuf byteBuf = (ByteBuf) msg;
+ ByteBufInputStream byteBufInputStream = new ByteBufInputStream(byteBuf);
+ DataInputStream in = new DataInputStream(byteBufInputStream);
+ while (true) {
+ try {
+ if (in.available() <= 0)
+ break;
+ // try to read an id
+ int id = in.readInt();
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(serverAddress.getHostName() + " got value #" + id);
+
+ Call call = calls.get(id);
+
+ // read call status
+ int state = in.readInt();
+ if (state == Status.SUCCESS.state) {
+ Writable value = ReflectionUtils.newInstance(valueClass, conf);
+ value.readFields(in); // read value
+ call.setValue(value);
+ calls.remove(id);
+ } else if (state == Status.ERROR.state) {
+ String className = WritableUtils.readString(in);
+ byte[] errorBytes = new byte[in.available()];
+ in.readFully(errorBytes);
+ call.setException(new RemoteException(className, new String(
+ errorBytes)));
+ calls.remove(id);
+ } else if (state == Status.FATAL.state) {
+ // Close the connection
+ markClosed(new RemoteException(WritableUtils.readString(in),
+ WritableUtils.readString(in)));
+ } else {
+ byte[] garbageBytes = new byte[in.available()];
+ in.readFully(garbageBytes);
+ }
+ } catch (IOException e) {
+ markClosed(e);
+ }
+ }
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(byteBufInputStream);
+ ReferenceCountUtil.release(msg);
+ }
+
+ /**
+ * Ths event handler method is called with a Throwable due to an I/O
+ * error. Then, exception is logged and its associated channel is closed
+ * here
+ *
+ * @param ctx
+ * @param cause
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.error("Occured I/O Error : " + cause.getMessage());
+ ctx.close();
+ }
+
+ /**
+ * this method is triggered after a long reading/writing/idle time, it is
+ * marked as to be closed, or the client is marked as not running.
+ *
+ * @param ctx
+ * @param evt
+ */
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent e = (IdleStateEvent) evt;
+ if (e.state() != IdleState.ALL_IDLE) {
+ if (!calls.isEmpty() && !shouldCloseConnection.get()
+ && running.get()) {
+ return;
+ } else if (shouldCloseConnection.get()) {
+ markClosed(null);
+ } else if (calls.isEmpty()) { // idle connection closed or stopped
+ markClosed(null);
+ } else { // get stopped but there are still pending requests
+ markClosed((IOException) new IOException()
+ .initCause(new InterruptedException()));
+ }
+ closeConnection();
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Handle connection failures If the current number of retries is equal to
+ * the max number of retries, stop retrying and throw the exception;
+ * Otherwise backoff 1 second and try connecting again. This Method is only
+ * called from inside setupIOstreams(), which is synchronized. Hence the
+ * sleep is synchronized; the locks will be retained.
+ *
+ * @param curRetries current number of retries
+ * @param maxRetries max number of retries allowed
+ * @param ioe failure reason
+ * @throws IOException if max number of retries is reached
+ */
+ @SuppressWarnings("unused")
+ private void handleConnectionFailure(int curRetries, int maxRetries,
+ IOException ioe) throws IOException {
+
+ closeConnection();
+
+ // throw the exception if the maximum number of retries is reached
+ if (curRetries >= maxRetries) {
+ throw ioe;
+ }
+
+ // otherwise back off and retry
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ }
+
+ LOG.info("Retrying connect to server: " + serverAddress
+ + ". Already tried " + curRetries + " time(s); maxRetries="
+ + maxRetries);
+ }
+
+ /*
+ * Handle connection failures If the current number of retries, stop
+ * retrying and throw the exception; Otherwise backoff 1 second and try
+ * connecting again. This Method is only called from inside
+ * setupIOstreams(), which is synchronized. Hence the sleep is synchronized;
+ * the locks will be retained.
+ * @param curRetries current number of retries
+ * @param ioe failure reason
+ * @throws Exception if max number of retries is reached
+ */
+ private void handleConnectionFailure(int curRetries, Exception ioe)
+ throws Exception {
+ closeConnection();
+
+ final boolean retry;
+ try {
+ retry = connectionRetryPolicy.shouldRetry(ioe, curRetries);
+ } catch (Exception e) {
+ throw e instanceof IOException ? (IOException) e : new IOException(e);
+ }
+ if (!retry) {
+ throw ioe;
+ }
+
+ LOG.info("Retrying connect to server: " + serverAddress
+ + ". Already tried " + curRetries + " time(s); retry policy is "
+ + connectionRetryPolicy);
+ }
+
+ /**
+ * Return the remote address of server
+ *
+ * @return remote server address
+ */
+ public InetSocketAddress getRemoteAddress() {
+ return serverAddress;
+ }
+
+ /**
+ * Initiates a call by sending the parameter to the remote server.
+ *
+ * @param sendCall
+ */
+ public void sendParam(Call sendCall) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(this.getClass().getName() + " sending #" + sendCall.id);
+ DataOutputBuffer buff = null;
+ try {
+ buff = new DataOutputBuffer();
+ buff.writeInt(sendCall.id);
+ sendCall.param.write(buff);
+ byte[] data = buff.getData();
+ int dataLength = buff.getLength();
+ ByteBuf buf = channel.alloc().buffer();
+
+ buf.writeInt(dataLength);
+ buf.writeBytes(data, 0, dataLength);
+ ChannelFuture channelFuture = channel.writeAndFlush(buf);
+ if (channelFuture.cause() != null) {
+ throw channelFuture.cause();
+ }
+ } catch (IOException ioe) {
+ markClosed(ioe);
+ } catch (Throwable t) {
+ markClosed(new IOException(t));
+ } finally {
+ // the buffer is just an in-memory buffer, but it is still
+ // polite to close early
+ IOUtils.closeStream(buff);
+ }
+ }
+
+ /**
+ * Mark the connection to be closed
+ *
+ * @param ioe
+ **/
+ private synchronized void markClosed(IOException ioe) {
+ if (shouldCloseConnection.compareAndSet(false, true)) {
+ closeException = ioe;
+ notifyAll();
+ }
+ }
+
+ /** Close the connection. */
+ private synchronized void close() {
+ if (!shouldCloseConnection.get()) {
+ LOG.error("The connection is not in the closed state");
+ return;
+ }
+
+ // release the resources
+ // first thing to do;take the connection out of the connection list
+ synchronized (connections) {
+ if (connections.get(remoteId) == this) {
+ Connection connection = connections.remove(remoteId);
+ connection.closeConnection();
+ }
+ }
+
+ // clean up all calls
+ if (closeException == null) {
+ if (!calls.isEmpty()) {
+ LOG.warn("A connection is closed for no cause and calls are not empty");
+
+ // clean up calls anyway
+ closeException = new IOException("Unexpected closed connection");
+ cleanupCalls();
+ }
+ } else {
+ // log the info
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing ipc connection to " + serverAddress + ": "
+ + closeException.getMessage(), closeException);
+ }
+
+ // cleanup calls
+ cleanupCalls();
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug(serverAddress.getHostName() + ": closed");
+ }
+
+ /** Cleanup all calls and mark them as done */
+ private void cleanupCalls() {
+ Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
+ while (itor.hasNext()) {
+ Call c = itor.next().getValue();
+ c.setException(closeException); // local exception
+ itor.remove();
+ }
+ }
+ }
+
+ /** A call waiting for a value. */
+ private class Call {
+ int id; // call id
+ Writable param; // parameter
+ Writable value; // value, null if error
+ IOException error; // exception, null if value
+ boolean done; // true when call is done
+
+ protected Call(Writable param) {
+ this.param = param;
+ synchronized (AsyncClient.this) {
+ this.id = counter++;
+ }
+ }
+
+ /**
+ * Indicate when the call is complete and the value or error are available.
+ * Notifies by default.
+ */
+ protected synchronized void callComplete() {
+ this.done = true;
+ notify(); // notify caller
+ }
+
+ /**
+ * Set the exception when there is an error. Notify the caller the call is
+ * done.
+ *
+ * @param error exception thrown by the call; either local or remote
+ */
+ public synchronized void setException(IOException error) {
+ this.error = error;
+ this.callComplete();
+ }
+
+ /**
+ * Set the return value when there is no error. Notify the caller the call
+ * is done.
+ *
+ * @param value return value of the call.
+ */
+ public synchronized void setValue(Writable value) {
+ this.value = value;
+ callComplete();
+ }
+ }
+
+ /** Call implementation used for parallel calls. */
+ private class ParallelCall extends Call {
+ private ParallelResults results;
+ private int index;
+
+ public ParallelCall(Writable param, ParallelResults results, int index) {
+ super(param);
+ this.results = results;
+ this.index = index;
+ }
+
+ @Override
+ /** Deliver result to result collector. */
+ protected void callComplete() {
+ results.callComplete(this);
+ }
+ }
+
+ /** Result collector for parallel calls. */
+ private static class ParallelResults {
+ private Writable[] values;
+ private int size;
+ private int count;
+
+ public ParallelResults(int size) {
+ this.values = new Writable[size];
+ this.size = size;
+ }
+
+ /**
+ * Collect a result.
+ *
+ * @param call
+ */
+ public synchronized void callComplete(ParallelCall call) {
+ values[call.index] = call.value; // store the value
+ count++; // count it
+ if (count == size) // if all values are in
+ notify(); // then notify waiting caller
+ }
+ }
+
+ /**
+ * Construct an IPC client whose values are of the given {@link Writable}
+ * class.
+ *
+ * @param valueClass
+ * @param conf
+ * @param factory
+ */
+ public AsyncClient(Class<? extends Writable> valueClass, Configuration conf,
+ SocketFactory factory) {
+ this.valueClass = valueClass;
+ this.conf = conf;
+ // SocketFactory only use in order to meet the consistency with other
+ // clients
+ this.socketFactory = factory;
+ }
+
+ /**
+ * Construct an IPC client with the default SocketFactory
+ *
+ * @param valueClass
+ * @param conf
+ */
+ public AsyncClient(Class<? extends Writable> valueClass, Configuration conf) {
+ // SocketFactory only use in order to meet the consistency with other
+ // clients
+ this(valueClass, conf, BSPNetUtils.getDefaultSocketFactory(conf));
+ }
+
+ /**
+ * Return the socket factory of this client
+ *
+ * @return this client's socket factory
+ */
+ SocketFactory getSocketFactory() {
+ // SocketFactory only use in order to meet the consistency with other
+ // clients
+ return socketFactory;
+ }
+
+ /**
+ * Stop all threads related to this client. No further calls may be made using
+ * this client.
+ */
+ public void stop() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping client");
+ }
+
+ if (!running.compareAndSet(true, false)) {
+ return;
+ }
+
+ // wake up all connections
+ synchronized (connections) {
+ for (Connection conn : connections.values()) {
+ conn.closeConnection();
+ }
+ }
+ }
+
+ /**
+ * Make a call, passing <code>param</code>, to the IPC server running at
+ * <code>address</code> which is servicing the <code>protocol</code> protocol,
+ * with the <code>ticket</code> credentials, <code>rpcTimeout</code> as
+ * timeout and <code>conf</code> as configuration for this connection,
+ * returning the value. Throws exceptions if there are network problems or if
+ * the remote code threw an exception.
+ *
+ * @param param
+ * @param addr
+ * @param protocol
+ * @param ticket
+ * @param rpcTimeout
+ * @param conf
+ * @return Response Writable value
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public Writable call(Writable param, InetSocketAddress addr,
+ Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+ Configuration conf) throws InterruptedException, IOException {
+ ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+ ticket, rpcTimeout, conf);
+ return call(param, remoteId);
+ }
+
+ /**
+ * Make a call, passing <code>param</code>, to the IPC server defined by
+ * <code>remoteId</code>, returning the value. Throws exceptions if there are
+ * network problems or if the remote code threw an exception.
+ *
+ * @param param
+ * @param remoteId
+ * @return Response Writable value
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public Writable call(Writable param, ConnectionId remoteId)
+ throws InterruptedException, IOException {
+ Call call = new Call(param);
+
+ Connection connection = getConnection(remoteId, call);
+
+ connection.sendParam(call); // send the parameter
+ boolean interrupted = false;
+
+ synchronized (call) {
+ int callFailCount = 0;
+ while (!call.done) {
+ try {
+ call.wait(1000); // wait for the result
+ // prevent client hang from response error
+ if (callFailCount++ == IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT)
+ break;
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ // set the interrupt flag now that we are done waiting
+ Thread.currentThread().interrupt();
+
+ }
+
+ if (call.error != null) {
+ if (call.error instanceof RemoteException) {
+ call.error.fillInStackTrace();
+ throw call.error;
+ } else { // local exception
+ // use the connection because it will reflect an ip change,
+ // unlike
+ // the remoteId
+ throw wrapException(connection.getRemoteAddress(), call.error);
+ }
+ } else {
+ return call.value;
+ }
+ }
+ }
+
+ /**
+ * Take an IOException and the address we were trying to connect to and return
+ * an IOException with the input exception as the cause. The new exception
+ * provides the stack trace of the place where the exception is thrown and
+ * some extra diagnostics information. If the exception is ConnectException or
+ * SocketTimeoutException, return a new one of the same type; Otherwise return
+ * an IOException.
+ *
+ * @param addr target address
+ * @param exception the relevant exception
+ * @return an exception to throw
+ */
+ private IOException wrapException(InetSocketAddress addr,
+ IOException exception) {
+ if (exception instanceof ConnectException) {
+ // connection refused; include the host:port in the error
+ return (ConnectException) new ConnectException("Call to " + addr
+ + " failed on connection exception: " + exception)
+ .initCause(exception);
+ } else if (exception instanceof SocketTimeoutException) {
+ return (SocketTimeoutException) new SocketTimeoutException("Call to "
+ + addr + " failed on socket timeout exception: " + exception)
+ .initCause(exception);
+ } else {
+ return (IOException) new IOException("Call to " + addr
+ + " failed on local exception: " + exception).initCause(exception);
+
+ }
+ }
+
+ /**
+ * Makes a set of calls in parallel. Each parameter is sent to the
+ * corresponding address. When all values are available, or have timed out or
+ * errored, the collected results are returned in an array. The array contains
+ * nulls for calls that timed out or errored.
+ *
+ * @param params
+ * @param addresses
+ * @param protocol
+ * @param ticket
+ * @param conf
+ * @return Response Writable value array
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
+ Class<?> protocol, UserGroupInformation ticket, Configuration conf)
+ throws IOException, InterruptedException {
+ if (addresses.length == 0)
+ return new Writable[0];
+
+ ParallelResults results = new ParallelResults(params.length);
+ ConnectionId remoteId[] = new ConnectionId[addresses.length];
+ synchronized (results) {
+ for (int i = 0; i < params.length; i++) {
+ ParallelCall call = new ParallelCall(params[i], results, i);
+ try {
+ remoteId[i] = ConnectionId.getConnectionId(addresses[i], protocol,
+ ticket, 0, conf);
+ Connection connection = getConnection(remoteId[i], call);
+ connection.sendParam(call); // send each parameter
+ } catch (IOException e) {
+ // log errors
+ LOG.info("Calling " + addresses[i] + " caught: " + e.getMessage(), e);
+ results.size--; // wait for one fewer result
+ }
+ }
+
+ while (results.count != results.size) {
+ try {
+ results.wait(); // wait for all results
+ } catch (InterruptedException e) {
+ }
+ }
+
+ return results.values;
+ }
+ }
+
+ // for unit testing only
+ Set<ConnectionId> getConnectionIds() {
+ synchronized (connections) {
+ return connections.keySet();
+ }
+ }
+
+ /**
+ * Get a connection from the pool, or create a new one and add it to the pool.
+ * Connections to a given ConnectionId are reused.
+ *
+ * @param remoteId
+ * @param call
+ * @return connection
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private synchronized Connection getConnection(ConnectionId remoteId, Call call)
+ throws IOException, InterruptedException {
+ if (!running.get()) {
+ // the client is stopped
+ throw new IOException("The client is stopped");
+ }
+ Connection connection;
+ /*
+ * we could avoid this allocation for each RPC by having a connectionsId
+ * object and with set() method. We need to manage the refs for keys in
+ * HashMap properly. For now its ok.
+ */
+ do {
+ connection = connections.get(remoteId);
+ if (connection == null) {
+ connection = new Connection(remoteId);
+ connections.put(remoteId, connection);
+ } else if (!connection.channel.isWritable()
+ || !connection.channel.isActive()) {
+ connection = new Connection(remoteId);
+ connections.remove(remoteId);
+ connections.put(remoteId, connection);
+ }
+ } while (!connection.addCall(call));
+ // we don't invoke the method below inside "synchronized (connections)"
+ // block above. The reason for that is if the server happens to be slow,
+ // it will take longer to establish a connection and that will slow the
+ // entire system down.
+
+ connection.setupIOstreams();
+ return connection;
+ }
+
+ /**
+ * This class holds the address and the user ticket. The client connections to
+ * servers are uniquely identified by <remoteAddress, protocol, ticket>
+ */
+ static class ConnectionId {
+ InetSocketAddress address;
+ UserGroupInformation ticket;
+ Class<?> protocol;
+ private static final int PRIME = 16777619;
+ private int rpcTimeout;
+ private String serverPrincipal;
+ private int maxIdleTime; // connections will be culled if it was idle for
+ // maxIdleTime msecs
+ private final RetryPolicy connectionRetryPolicy;
+ private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+ private int pingInterval; // how often sends ping to the server in msecs
+
+ ConnectionId(InetSocketAddress address, Class<?> protocol,
+ UserGroupInformation ticket, int rpcTimeout, String serverPrincipal,
+ int maxIdleTime, RetryPolicy connectionRetryPolicy, boolean tcpNoDelay,
+ int pingInterval) {
+ this.protocol = protocol;
+ this.address = address;
+ this.ticket = ticket;
+ this.rpcTimeout = rpcTimeout;
+ this.serverPrincipal = serverPrincipal;
+ this.maxIdleTime = maxIdleTime;
+ this.connectionRetryPolicy = connectionRetryPolicy;
+ this.tcpNoDelay = tcpNoDelay;
+ this.pingInterval = pingInterval;
+ }
+
+ InetSocketAddress getAddress() {
+ return address;
+ }
+
+ Class<?> getProtocol() {
+ return protocol;
+ }
+
+ private int getRpcTimeout() {
+ return rpcTimeout;
+ }
+
+ String getServerPrincipal() {
+ return serverPrincipal;
+ }
+
+ int getMaxIdleTime() {
+ return maxIdleTime;
+ }
+
+ boolean getTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ int getPingInterval() {
+ return pingInterval;
+ }
+
+ static ConnectionId getConnectionId(InetSocketAddress addr,
+ Class<?> protocol, UserGroupInformation ticket, Configuration conf)
+ throws IOException {
+ return getConnectionId(addr, protocol, ticket, 0, conf);
+ }
+
+ static ConnectionId getConnectionId(InetSocketAddress addr,
+ Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+ Configuration conf) throws IOException {
+ return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
+ }
+
+ static ConnectionId getConnectionId(InetSocketAddress addr,
+ Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+ RetryPolicy connectionRetryPolicy, Configuration conf)
+ throws IOException {
+
+ if (connectionRetryPolicy == null) {
+ final int max = conf.getInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
+ connectionRetryPolicy = RetryPolicies
+ .retryUpToMaximumCountWithFixedSleep(max, 1, TimeUnit.SECONDS);
+ }
+
+ return new ConnectionId(addr, protocol, ticket, rpcTimeout,
+ null,
+ conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
+ connectionRetryPolicy,
+ conf.getBoolean("ipc.client.tcpnodelay", true),
+ AsyncClient.getPingInterval(conf));
+ }
+
+ static boolean isEqual(Object a, Object b) {
+ return a == null ? b == null : a.equals(b);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof ConnectionId) {
+ ConnectionId that = (ConnectionId) obj;
+ return isEqual(this.address, that.address)
+ && this.maxIdleTime == that.maxIdleTime
+ && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy)
+ && this.pingInterval == that.pingInterval
+ && isEqual(this.protocol, that.protocol)
+ && this.rpcTimeout == that.rpcTimeout
+ && isEqual(this.serverPrincipal, that.serverPrincipal)
+ && this.tcpNoDelay == that.tcpNoDelay
+ && isEqual(this.ticket, that.ticket);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = connectionRetryPolicy.hashCode();
+ result = PRIME * result + ((address == null) ? 0 : address.hashCode());
+ result = PRIME * result + maxIdleTime;
+ result = PRIME * result + pingInterval;
+ result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
+ result = PRIME * rpcTimeout;
+ result = PRIME * result
+ + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
+ result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
+ result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
+ return result;
+ }
+ }
+}
Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java Tue Sep 30 00:44:14 2014
@@ -0,0 +1,777 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hama.util.BSPNetUtils;
+
+/**
+ * A simple RPC mechanism using netty.
+ *
+ * A <i>protocol</i> is a Java interface. All parameters and return types must
+ * be one of:
+ *
+ * <ul>
+ * <li>a primitive type, <code>boolean</code>, <code>byte</code>,
+ * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
+ * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
+ *
+ * <li>a {@link String}; or</li>
+ *
+ * <li>a {@link Writable}; or</li>
+ *
+ * <li>an array of the above types</li>
+ * </ul>
+ *
+ * All methods in the protocol should throw only IOException. No field data of
+ * the protocol instance is transmitted.
+ */
+public class AsyncRPC {
+ private static final Log LOG = LogFactory.getLog(AsyncRPC.class);
+
+ private AsyncRPC() {
+ } // no public ctor
+
+ /** A method invocation, including the method name and its parameters. */
+ @SuppressWarnings("rawtypes")
+ private static class Invocation implements Writable, Configurable {
+ private String methodName;
+ private Class[] parameterClasses;
+ private Object[] parameters;
+ private Configuration conf;
+
+ @SuppressWarnings("unused")
+ public Invocation() {
+ }
+
+ /**
+ *
+ * @param method
+ * @param parameters
+ */
+ public Invocation(Method method, Object[] parameters) {
+ this.methodName = method.getName();
+ this.parameterClasses = method.getParameterTypes();
+ this.parameters = parameters;
+ }
+
+ /** The name of the method invoked. */
+ public String getMethodName() {
+ return methodName;
+ }
+
+ /** The parameter classes. */
+ public Class[] getParameterClasses() {
+ return parameterClasses;
+ }
+
+ /** The parameter instances. */
+ public Object[] getParameters() {
+ return parameters;
+ }
+
+ /**
+ *
+ * @param in
+ */
+ public void readFields(DataInput in) throws IOException {
+ methodName = Text.readString(in);
+ parameters = new Object[in.readInt()];
+ parameterClasses = new Class[parameters.length];
+ ObjectWritable objectWritable = new ObjectWritable();
+ for (int i = 0; i < parameters.length; i++) {
+ parameters[i] = ObjectWritable
+ .readObject(in, objectWritable, this.conf);
+ parameterClasses[i] = objectWritable.getDeclaredClass();
+ }
+ }
+
+ /**
+ *
+ * @param out
+ */
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, methodName);
+ out.writeInt(parameterClasses.length);
+ for (int i = 0; i < parameterClasses.length; i++) {
+ ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+ conf);
+ }
+ }
+
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(methodName);
+ buffer.append("(");
+ for (int i = 0; i < parameters.length; i++) {
+ if (i != 0)
+ buffer.append(", ");
+ buffer.append(parameters[i]);
+ }
+ buffer.append(")");
+ return buffer.toString();
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ }
+
+ /** Cache a client using its socket factory as the hash key */
+ static private class ClientCache {
+ private Map<SocketFactory, AsyncClient> clients = new HashMap<SocketFactory, AsyncClient>();
+
+ /**
+ * Construct & cache an IPC client with the user-provided SocketFactory if
+ * no cached client exists.
+ *
+ * @param conf Configuration
+ * @return an IPC client
+ */
+ private synchronized AsyncClient getClient(Configuration conf,
+ SocketFactory factory) {
+ // Construct & cache client. The configuration is only used for timeout,
+ // and Clients have connection pools. So we can either (a) lose some
+ // connection pooling and leak sockets, or (b) use the same timeout for
+ // all
+ // configurations. Since the IPC is usually intended globally, not
+ // per-job, we choose (a).
+ AsyncClient client = clients.get(factory);
+ if (client == null) {
+ client = new AsyncClient(ObjectWritable.class, conf, factory);
+ clients.put(factory, client);
+ } else {
+ client.incCount();
+ }
+ return client;
+ }
+
+ /**
+ * Construct & cache an IPC client with the default SocketFactory if no
+ * cached client exists.
+ *
+ * @param conf Configuration
+ * @return an IPC client
+ */
+ private synchronized AsyncClient getClient(Configuration conf) {
+ return getClient(conf, SocketFactory.getDefault());
+ }
+
+ /**
+ * Stop a RPCWithNetty client connection A RPCWithNetty client is closed
+ * only when its reference count becomes zero.
+ *
+ * @param client
+ */
+ private void stopClient(AsyncClient client) {
+ synchronized (this) {
+ client.decCount();
+ if (client.isZeroReference()) {
+ clients.remove(client.getSocketFactory());
+ }
+ }
+ if (client.isZeroReference()) {
+ client.stop();
+ }
+ }
+ }
+
+ private static ClientCache CLIENTS = new ClientCache();
+
+ /**
+ * for unit testing only
+ *
+ * @param conf
+ * @return
+ */
+ static AsyncClient getClient(Configuration conf) {
+ return CLIENTS.getClient(conf);
+ }
+
+ /**
+ *
+ */
+ private static class Invoker implements InvocationHandler {
+ private AsyncClient.ConnectionId remoteId;
+ private AsyncClient client;
+ private boolean isClosed = false;
+
+ private Invoker(Class<? extends VersionedProtocol> protocol,
+ InetSocketAddress address, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory, int rpcTimeout,
+ RetryPolicy connectionRetryPolicy) throws IOException {
+ this.remoteId = AsyncClient.ConnectionId.getConnectionId(address,
+ protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
+ this.client = CLIENTS.getClient(conf, factory);
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ final boolean logDebug = LOG.isDebugEnabled();
+ long startTime = 0;
+ if (logDebug) {
+ startTime = System.currentTimeMillis();
+ }
+
+ ObjectWritable value = (ObjectWritable) client.call(new Invocation(
+ method, args), remoteId);
+ if (logDebug) {
+ long callTime = System.currentTimeMillis() - startTime;
+ LOG.debug("Call: " + method.getName() + " " + callTime);
+ }
+ return value.get();
+ }
+
+ /** close the RPCWithNetty client that's responsible for this invoker's RPCs */
+ synchronized private void close() {
+ if (!isClosed) {
+ isClosed = true;
+ CLIENTS.stopClient(client);
+ }
+ }
+ }
+
+ /**
+ * A version mismatch for the RPC protocol.
+ */
+ @SuppressWarnings("serial")
+ public static class VersionMismatch extends IOException {
+ private String interfaceName;
+ private long clientVersion;
+ private long serverVersion;
+
+ /**
+ * Create a version mismatch exception
+ *
+ * @param interfaceName the name of the protocol mismatch
+ * @param clientVersion the client's version of the protocol
+ * @param serverVersion the server's version of the protocol
+ */
+ public VersionMismatch(String interfaceName, long clientVersion,
+ long serverVersion) {
+ super("Protocol " + interfaceName + " version mismatch. (client = "
+ + clientVersion + ", server = " + serverVersion + ")");
+ this.interfaceName = interfaceName;
+ this.clientVersion = clientVersion;
+ this.serverVersion = serverVersion;
+ }
+
+ /**
+ * Get the interface name
+ *
+ * @return the java class name (eg.
+ * org.apache.hadoop.mapred.InterTrackerProtocol)
+ */
+ public String getInterfaceName() {
+ return interfaceName;
+ }
+
+ /**
+ * Get the client's preferred version
+ */
+ public long getClientVersion() {
+ return clientVersion;
+ }
+
+ /**
+ * Get the server's agreed to version.
+ */
+ public long getServerVersion() {
+ return serverVersion;
+ }
+ }
+
+ /**
+ * Get a proxy connection to a remote server
+ */
+ public static VersionedProtocol waitForProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf) throws IOException {
+ return waitForProxy(protocol, clientVersion, addr, conf, 0, Long.MAX_VALUE);
+ }
+
+ /**
+ * Get a proxy connection to a remote server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @param connTimeout time in milliseconds before giving up
+ * @return the proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ static VersionedProtocol waitForProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf, long connTimeout)
+ throws IOException {
+ return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+ }
+
+ /**
+ * Get a proxy connection to a remote server
+ *
+ * @param protocol protocol class
+ * @param clientVersion client version
+ * @param addr remote address
+ * @param conf configuration to use
+ * @param rpcTimeout rpc timeout
+ * @param connTimeout time in milliseconds before giving up
+ * @return the proxy
+ * @throws IOException if the far end through a RemoteException
+ */
+ static VersionedProtocol waitForProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf, int rpcTimeout,
+ long connTimeout) throws IOException {
+ long startTime = System.currentTimeMillis();
+ IOException ioe;
+ while (true) {
+ try {
+ return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
+ } catch (ConnectException se) { // namenode has not been started
+ LOG.info("Server at " + addr + " not available yet, Zzzzz...");
+ ioe = se;
+ } catch (SocketTimeoutException te) { // namenode is busy
+ LOG.info("Problem connecting to server: " + addr);
+ ioe = te;
+ }
+ // check if timed out
+ if (System.currentTimeMillis() - connTimeout >= startTime) {
+ throw ioe;
+ }
+
+ // wait for retry
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ // IGNORE
+ }
+ }
+ }
+
+ /**
+ * Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @param factory
+ * @return the proxy
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf, SocketFactory factory)
+ throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ return getProxy(protocol, clientVersion, addr, ugi, conf, factory, 0);
+ }
+
+ /**
+ * Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @param factory
+ * @param rpcTimeout
+ * @return the proxy
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf, SocketFactory factory,
+ int rpcTimeout) throws IOException {
+ return getProxy(protocol, clientVersion, addr, null, conf, factory,
+ rpcTimeout);
+ }
+
+ /**
+ * Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param ticket
+ * @param conf
+ * @param factory
+ * @return the proxy
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory) throws IOException {
+ return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
+ }
+
+ /**
+ * Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param ticket
+ * @param conf
+ * @param factory
+ * @param rpcTimeout
+ * @return the proxy
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout) throws IOException {
+ return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+ rpcTimeout, null, true);
+ }
+
+ /**
+ * Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param ticket
+ * @param conf
+ * @param factory
+ * @param rpcTimeout
+ * @param connectionRetryPolicy
+ * @param checkVersion
+ * @return the proxy
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
+ boolean checkVersion) throws IOException {
+
+ final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
+ rpcTimeout, connectionRetryPolicy);
+ VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
+ protocol.getClassLoader(), new Class[] { protocol }, invoker);
+
+ if (checkVersion) {
+ checkVersion(protocol, clientVersion, proxy);
+ }
+ return proxy;
+ }
+
+ /**
+ * Get server version and then compare it with client version.
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param proxy
+ * @throws IOException
+ */
+ public static void checkVersion(Class<? extends VersionedProtocol> protocol,
+ long clientVersion, VersionedProtocol proxy) throws IOException {
+ long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+ clientVersion);
+ if (serverVersion != clientVersion) {
+ throw new VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ }
+
+ /**
+ * Construct a client-side proxy object with the default SocketFactory
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @return a proxy instance
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf) throws IOException {
+ return getProxy(protocol, clientVersion, addr, conf,
+ BSPNetUtils.getDefaultSocketFactory(conf), 0);
+ }
+
+ /**
+ * Get VersionedProtocol
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @param rpcTimeout
+ * @return the proxy
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(
+ Class<? extends VersionedProtocol> protocol, long clientVersion,
+ InetSocketAddress addr, Configuration conf, int rpcTimeout)
+ throws IOException {
+
+ return getProxy(protocol, clientVersion, addr, conf,
+ BSPNetUtils.getDefaultSocketFactory(conf), rpcTimeout);
+ }
+
+ /**
+ * Stop this proxy and release its invoker's resource
+ *
+ * @param proxy the proxy to be stopped
+ */
+ public static void stopProxy(VersionedProtocol proxy) {
+ if (proxy != null) {
+ ((Invoker) Proxy.getInvocationHandler(proxy)).close();
+ }
+ }
+
+ /**
+ * Expert: Make multiple, parallel calls to a set of servers.
+ *
+ * @param method
+ * @param params
+ * @param addrs
+ * @param ticket
+ * @param conf
+ * @return response object array
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static Object[] call(Method method, Object[][] params,
+ InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf)
+ throws IOException, InterruptedException {
+
+ Invocation[] invocations = new Invocation[params.length];
+ for (int i = 0; i < params.length; i++)
+ invocations[i] = new Invocation(method, params[i]);
+ AsyncClient client = CLIENTS.getClient(conf);
+ try {
+ Writable[] wrappedValues = client.call(invocations, addrs,
+ method.getDeclaringClass(), ticket, conf);
+
+ if (method.getReturnType() == Void.TYPE) {
+ return null;
+ }
+
+ Object[] values = (Object[]) Array.newInstance(method.getReturnType(),
+ wrappedValues.length);
+ for (int i = 0; i < values.length; i++)
+ if (wrappedValues[i] != null)
+ values[i] = ((ObjectWritable) wrappedValues[i]).get();
+
+ return values;
+ } finally {
+ CLIENTS.stopClient(client);
+ }
+ }
+
+ /**
+ * Construct a server for a protocol implementation instance listening on a
+ * port and address.
+ *
+ * @param instance
+ * @param bindAddress
+ * @param port
+ * @param conf
+ * @return server
+ * @throws IOException
+ */
+ public static NioServer getServer(final Object instance,
+ final String bindAddress, final int port, Configuration conf)
+ throws IOException {
+ return getServer(instance, bindAddress, port, 1, false, conf);
+ }
+
+ /**
+ * Construct a server for a protocol implementation instance listening on a
+ * port and address.
+ *
+ * @param instance
+ * @param bindAddress
+ * @param port
+ * @param numHandlers
+ * @param verbose
+ * @param conf
+ * @return server
+ * @throws IOException
+ */
+ public static NioServer getServer(final Object instance,
+ final String bindAddress, final int port, final int numHandlers,
+ final boolean verbose, Configuration conf) throws IOException {
+ return getServer(instance, bindAddress, port, numHandlers, verbose, conf,
+ null);
+ }
+
+ /**
+ * Construct a server for a protocol implementation instance listening on a
+ * port and address, with a secret manager.
+ *
+ * @param instance
+ * @param bindAddress
+ * @param port
+ * @param numHandlers
+ * @param verbose
+ * @param conf
+ * @param secretManager
+ * @return server
+ * @throws IOException
+ */
+ public static NioServer getServer(final Object instance,
+ final String bindAddress, final int port, final int numHandlers,
+ final boolean verbose, Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ return new NioServer(instance, conf, bindAddress, port, numHandlers,
+ verbose, secretManager);
+ }
+
+ /** An RPC Server. */
+ public static class NioServer extends org.apache.hama.ipc.AsyncServer {
+ private Object instance;
+ private boolean verbose;
+
+ /**
+ * Construct an RPC server.
+ *
+ * @param instance the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @throws IOException
+ */
+ public NioServer(Object instance, Configuration conf, String bindAddress,
+ int port) throws IOException {
+ this(instance, conf, bindAddress, port, 1, false, null);
+ }
+
+ private static String classNameBase(String className) {
+ String[] names = className.split("\\.", -1);
+ if (names == null || names.length == 0) {
+ return className;
+ }
+ return names[names.length - 1];
+ }
+
+ /**
+ * Construct an RPC server.
+ *
+ * @param instance the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param verbose whether each call should be logged
+ * @throws IOException
+ */
+ public NioServer(Object instance, Configuration conf, String bindAddress,
+ int port, int numHandlers, boolean verbose,
+ SecretManager<? extends TokenIdentifier> secretManager)
+ throws IOException {
+ super(bindAddress, port, Invocation.class, numHandlers, conf,
+ classNameBase(instance.getClass().getName()), secretManager);
+ this.instance = instance;
+ this.verbose = verbose;
+ }
+
+ public Writable call(Class<?> protocol, Writable param, long receivedTime)
+ throws IOException {
+ try {
+ Invocation call = (Invocation) param;
+ if (verbose)
+ log("Call: " + call);
+
+ Method method = protocol.getMethod(call.getMethodName(),
+ call.getParameterClasses());
+ method.setAccessible(true);
+
+ long startTime = System.currentTimeMillis();
+ Object value = method.invoke(instance, call.getParameters());
+ int processingTime = (int) (System.currentTimeMillis() - startTime);
+ int qTime = (int) (startTime - receivedTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Served: " + call.getMethodName() + " queueTime= " + qTime
+ + " procesingTime= " + processingTime);
+ }
+ if (verbose)
+ log("Return: " + value);
+
+ return new ObjectWritable(method.getReturnType(), value);
+
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof IOException) {
+ throw (IOException) target;
+ } else {
+ IOException ioe = new IOException(target.toString());
+ ioe.setStackTrace(target.getStackTrace());
+ throw ioe;
+ }
+ } catch (Throwable e) {
+ if (!(e instanceof IOException)) {
+ LOG.error("Unexpected throwable object ", e);
+ }
+ IOException ioe = new IOException(e.toString());
+ ioe.setStackTrace(e.getStackTrace());
+ throw ioe;
+ }
+ }
+ }
+
+ private static void log(String value) {
+ if (value != null && value.length() > 55)
+ value = value.substring(0, 55) + "...";
+ LOG.info(value);
+ }
+}
Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java
------------------------------------------------------------------------------
svn:mime-type = text/plain