You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by bs...@apache.org on 2014/09/30 02:44:15 UTC

svn commit: r1628344 [1/2] - in /hama/trunk/core: ./ src/main/java/org/apache/hama/bsp/message/ src/main/java/org/apache/hama/ipc/ src/test/java/org/apache/hama/bsp/message/ src/test/java/org/apache/hama/ipc/

Author: bsmin
Date: Tue Sep 30 00:44:14 2014
New Revision: 1628344

URL: http://svn.apache.org/r1628344
Log:
HAMA-913: add RPC implementation using Netty

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java   (with props)
    hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncServer.java   (with props)
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java   (with props)
    hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncIPC.java   (with props)
    hama/trunk/core/src/test/java/org/apache/hama/ipc/TestAsyncRPC.java   (with props)
Modified:
    hama/trunk/core/pom.xml

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1628344&r1=1628343&r2=1628344&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Tue Sep 30 00:44:14 2014
@@ -139,6 +139,11 @@
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <version>4.0.21.Final</version>
+    </dependency>    
   </dependencies>
 
   <build>

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java Tue Sep 30 00:44:14 2014
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.BSPPeerImpl;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
+import org.apache.hama.ipc.AsyncRPC;
+import org.apache.hama.ipc.AsyncServer;
+import org.apache.hama.util.LRUCache;
+
+/**
+ * Implementation of the {@link HamaMessageManager}.
+ * 
+ */
+public final class HamaAsyncMessageManagerImpl<M extends Writable> extends
+    AbstractMessageManager<M> implements HamaMessageManager<M> {
+
+  private static final Log LOG = LogFactory
+      .getLog(HamaAsyncMessageManagerImpl.class);
+
+  private AsyncServer server;
+
+  private LRUCache<InetSocketAddress, HamaMessageManager<M>> peersLRUCache = null;
+
+  @SuppressWarnings("serial")
+  @Override
+  public final void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
+      HamaConfiguration conf, InetSocketAddress peerAddress) {
+    super.init(attemptId, peer, conf, peerAddress);
+    startRPCServer(conf, peerAddress);
+    peersLRUCache = new LRUCache<InetSocketAddress, HamaMessageManager<M>>(
+        maxCachedConnections) {
+      @Override
+      protected final boolean removeEldestEntry(
+          Map.Entry<InetSocketAddress, HamaMessageManager<M>> eldest) {
+        if (size() > this.capacity) {
+          HamaMessageManager<M> proxy = eldest.getValue();
+          AsyncRPC.stopProxy(proxy);
+          return true;
+        }
+        return false;
+      }
+    };
+  }
+
+  private final void startRPCServer(Configuration conf,
+      InetSocketAddress peerAddress) {
+    try {
+      startServer(peerAddress.getHostName(), peerAddress.getPort());
+    } catch (IOException ioe) {
+      LOG.error("Fail to start RPC server!", ioe);
+      throw new RuntimeException("RPC Server could not be launched!");
+    }
+  }
+
+  private void startServer(String hostName, int port) throws IOException {
+    int retry = 0;
+    try {
+      this.server = AsyncRPC.getServer(this, hostName, port,
+          conf.getInt("hama.default.messenger.handler.threads.num", 5), false,
+          conf);
+
+      server.start();
+      LOG.info("BSPPeer address:" + server.getAddress().getHostName()
+          + " port:" + server.getAddress().getPort());
+    } catch (BindException e) {
+      LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1);
+      startServer(hostName, port + 1);
+      retry++;
+
+      if (retry > 5) {
+        throw new RuntimeException("RPC Server could not be launched!");
+      }
+    }
+  }
+
+  @Override
+  public final void close() {
+    super.close();
+    if (server != null) {
+      server.stop();
+    }
+  }
+
+  @Override
+  public final void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
+      throws IOException {
+    HamaMessageManager<M> bspPeerConnection = this.getBSPPeerConnection(addr);
+    if (bspPeerConnection == null) {
+      throw new IllegalArgumentException("Can not find " + addr.toString()
+          + " to transfer messages to!");
+    } else {
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+          bundle.getLength());
+      bspPeerConnection.put(bundle);
+    }
+  }
+
+  /**
+   * @param addr socket address to which BSP Peer Connection will be established
+   * @return BSP Peer Connection, tried to return cached connection, else
+   *         returns a new connection and caches it
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  protected final HamaMessageManager<M> getBSPPeerConnection(
+      InetSocketAddress addr) throws IOException {
+    HamaMessageManager<M> bspPeerConnection;
+    if (!peersLRUCache.containsKey(addr)) {
+      bspPeerConnection = (HamaMessageManager<M>) AsyncRPC.getProxy(
+          HamaMessageManager.class, HamaRPCProtocolVersion.versionID, addr,
+          this.conf);
+      peersLRUCache.put(addr, bspPeerConnection);
+    } else {
+      bspPeerConnection = peersLRUCache.get(addr);
+    }
+    return bspPeerConnection;
+  }
+
+  @Override
+  public final void put(M msg) throws IOException {
+    loopBackMessage(msg);
+  }
+
+  @Override
+  public final void put(BSPMessageBundle<M> bundle) throws IOException {
+    loopBackBundle(bundle);
+  }
+
+  @Override
+  public final long getProtocolVersion(String arg0, long arg1)
+      throws IOException {
+    return versionID;
+  }
+
+  @Override
+  public InetSocketAddress getListenerAddress() {
+    if (this.server != null) {
+      return this.server.getAddress();
+    }
+    return null;
+  }
+
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java Tue Sep 30 00:44:14 2014
@@ -0,0 +1,1138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.BSPNetUtils;
+
+/**
+ * A client for an IPC service using netty. IPC calls take a single
+ * {@link Writable} as a parameter, and return a {@link Writable} as their
+ * value. A service runs on a port and is defined by a parameter class and a
+ * value class.
+ * 
+ * @see AsyncClient
+ */
+public class AsyncClient {
+  private static final String IPC_CLIENT_CONNECT_MAX_RETRIES_KEY = "ipc.client.connect.max.retries";
+  private static final int IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT = 10;
+  private static final Log LOG = LogFactory.getLog(AsyncClient.class);
+  private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
+
+  private Class<? extends Writable> valueClass; // class of call values
+  private int counter = 0; // counter for call ids
+  private AtomicBoolean running = new AtomicBoolean(true); // if client runs
+  final private Configuration conf; // configuration obj
+
+  private SocketFactory socketFactory; // only use in order to meet the
+                                       // consistency with other clients
+  private int refCount = 1;
+
+  final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
+  final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
+
+  /**
+   * set the ping interval value in configuration
+   * 
+   * @param conf Configuration
+   * @param pingInterval the ping interval
+   */
+  final public static void setPingInterval(Configuration conf, int pingInterval) {
+    conf.setInt(PING_INTERVAL_NAME, pingInterval);
+  }
+
+  /**
+   * Get the ping interval from configuration; If not set in the configuration,
+   * return the default value.
+   * 
+   * @param conf Configuration
+   * @return the ping interval
+   */
+  final static int getPingInterval(Configuration conf) {
+    return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
+  }
+
+  /**
+   * The time after which a RPC will timeout. If ping is not enabled (via
+   * ipc.client.ping), then the timeout value is the same as the pingInterval.
+   * If ping is enabled, then there is no timeout value.
+   * 
+   * @param conf Configuration
+   * @return the timeout period in milliseconds. -1 if no timeout value is set
+   */
+  final public static int getTimeout(Configuration conf) {
+    if (!conf.getBoolean("ipc.client.ping", true)) {
+      return getPingInterval(conf);
+    }
+    return -1;
+  }
+
+  /**
+   * Increment this client's reference count
+   * 
+   */
+  synchronized void incCount() {
+    refCount++;
+  }
+
+  /**
+   * Decrement this client's reference count
+   * 
+   */
+  synchronized void decCount() {
+    refCount--;
+  }
+
+  /**
+   * Return if this client has no reference
+   * 
+   * @return true if this client has no reference; false otherwise
+   */
+  synchronized boolean isZeroReference() {
+    return refCount == 0;
+  }
+
+  /**
+   * Thread that reads responses and notifies callers. Each connection owns a
+   * socket connected to a remote address. Calls are multiplexed through this
+   * socket: responses may be delivered out of order.
+   */
+  private class Connection {
+    private InetSocketAddress serverAddress; // server ip:port
+    private ConnectionHeader header; // connection header
+    private final ConnectionId remoteId; // connection id
+    private AuthMethod authMethod; // authentication method
+
+    private EventLoopGroup group;
+    private Bootstrap bootstrap;
+    private Channel channel;
+    private int rpcTimeout;
+    private int maxIdleTime; // connections will be culled if it was idle
+
+    private final RetryPolicy connectionRetryPolicy;
+    private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+    private int pingInterval; // how often sends ping to the server in msecs
+
+    // currently active calls
+    private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
+    private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate
+    private IOException closeException; // if the connection is closed, close
+                                        // reason
+
+    /**
+     * Setup Connection Configuration
+     * 
+     * @param remoteId remote connection Id
+     * @throws IOException
+     */
+    public Connection(ConnectionId remoteId) throws IOException {
+      group = new NioEventLoopGroup();
+      bootstrap = new Bootstrap();
+      this.remoteId = remoteId;
+      this.serverAddress = remoteId.getAddress();
+      if (serverAddress.isUnresolved()) {
+        throw new UnknownHostException("unknown host: "
+            + remoteId.getAddress().getHostName());
+      }
+      this.maxIdleTime = remoteId.getMaxIdleTime();
+      this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
+      this.tcpNoDelay = remoteId.getTcpNoDelay();
+      this.pingInterval = remoteId.getPingInterval();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+      }
+      this.rpcTimeout = remoteId.getRpcTimeout();
+      Class<?> protocol = remoteId.getProtocol();
+
+      authMethod = AuthMethod.SIMPLE;
+      header = new ConnectionHeader(protocol == null ? null
+          : protocol.getName(), null, authMethod);
+    }
+
+    /**
+     * Add a call to this connection's call queue and notify a listener;
+     * synchronized. Returns false if called during shutdown.
+     * 
+     * @param call to add
+     * @return true if the call was added.
+     */
+    private synchronized boolean addCall(Call call) {
+      if (shouldCloseConnection.get())
+        return false;
+      calls.put(call.id, call);
+      notify();
+      return true;
+    }
+
+    /**
+     * Update the server address if the address corresponding to the host name
+     * has changed.
+     */
+    private synchronized boolean updateAddress() throws IOException {
+      // Do a fresh lookup with the old host name.
+      InetSocketAddress currentAddr = BSPNetUtils.makeSocketAddr(
+          serverAddress.getHostName(), serverAddress.getPort());
+
+      if (!serverAddress.equals(currentAddr)) {
+        LOG.warn("Address change detected. Old: " + serverAddress.toString()
+            + " New: " + currentAddr.toString());
+        serverAddress = currentAddr;
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * Connect to the server and set up the I/O streams. It then sends a header
+     * to the server.
+     */
+    private void setupIOstreams() throws InterruptedException {
+      if (channel != null && channel.isActive()) {
+        return;
+      }
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Connecting to " + serverAddress);
+        }
+
+        setupConnection();
+        writeHeader();
+      } catch (Throwable t) {
+        if (t instanceof IOException) {
+          markClosed((IOException) t);
+        } else {
+          markClosed(new IOException("Couldn't set up IO streams", t));
+        }
+        close();
+      }
+    }
+
+    /**
+     * Configure the client and connect to server
+     */
+    private void setupConnection() throws Exception {
+      while (true) {
+        short ioFailures = 0;
+        try {
+          // rpcTimeout overwrites pingInterval
+          if (rpcTimeout > 0) {
+            pingInterval = rpcTimeout;
+          }
+
+          // Configure the client.
+          // NioEventLoopGroup is a multithreaded event loop that handles I/O
+          // operation
+          group = new NioEventLoopGroup();
+          // Bootstrap is a helper class that sets up a client
+          bootstrap = new Bootstrap();
+          bootstrap.group(group).channel(NioSocketChannel.class)
+              .option(ChannelOption.TCP_NODELAY, this.tcpNoDelay)
+              .option(ChannelOption.SO_KEEPALIVE, true)
+              .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, pingInterval)
+              .option(ChannelOption.SO_SNDBUF, 30 * 1024 * 1024)
+              .handler(new LoggingHandler(LogLevel.INFO))
+              .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                  ChannelPipeline p = ch.pipeline();
+                  p.addLast(new IdleStateHandler(0, 0, maxIdleTime));
+                  // Register message processing handler
+                  p.addLast(new NioClientInboundHandler());
+                }
+              });
+
+          // Bind and start to accept incoming connections.
+          ChannelFuture channelFuture = bootstrap.connect(
+              serverAddress.getAddress(), serverAddress.getPort()).sync();
+          // Get io channel
+          channel = channelFuture.channel();
+          LOG.info("AsyncClient startup");
+          break;
+        } catch (Exception ie) {
+          /*
+           * Check for an address change and update the local reference. Reset
+           * the failure counter if the address was changed
+           */
+
+          if (updateAddress()) {
+            ioFailures = 0;
+          }
+          handleConnectionFailure(ioFailures++, ie);
+        }
+      }
+    }
+
+    /**
+     * Write the header protocol header for each connection Out is not
+     * synchronized because only the first thread does this.
+     * 
+     * @param channel
+     */
+    private void writeHeader() {
+      DataOutputBuffer rpcBuff = null;
+      DataOutputBuffer headerBuf = null;
+      try {
+        ByteBuf buf = channel.alloc().buffer();
+        rpcBuff = new DataOutputBuffer();
+        authMethod.write(rpcBuff);
+
+        headerBuf = new DataOutputBuffer();
+        header.write(headerBuf);
+        byte[] data = headerBuf.getData();
+        int dataLength = headerBuf.getLength();
+        // write rpcheader
+        buf.writeInt(AsyncServer.HEADER_LENGTH + dataLength);
+        buf.writeBytes(AsyncServer.HEADER.array());
+        buf.writeByte(AsyncServer.CURRENT_VERSION);
+        buf.writeByte(rpcBuff.getData()[0]);
+        // write header
+        buf.writeInt(dataLength);
+        buf.writeBytes(data, 0, dataLength);
+
+        channel.writeAndFlush(buf);
+      } catch (Exception e) {
+        LOG.error("Couldn't send header" + e);
+      } finally {
+        IOUtils.closeStream(rpcBuff);
+        IOUtils.closeStream(headerBuf);
+      }
+    }
+
+    /**
+     * close the current connection gracefully.
+     */
+    private void closeConnection() {
+      try {
+        if (!this.group.isTerminated()) {
+          this.group.shutdownGracefully();
+          LOG.info("client gracefully shutdown");
+        }
+      } catch (Exception e) {
+        LOG.warn("Not able to close a client", e);
+      }
+    }
+
+    /**
+     * This class process received response message from server.
+     */
+    private class NioClientInboundHandler extends ChannelInboundHandlerAdapter {
+
+      /**
+       * Receive a response. This method is called with the received response
+       * message, whenever new data is received from a server.
+       * 
+       * @param ctx
+       * @param cause
+       */
+      @Override
+      public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        ByteBuf byteBuf = (ByteBuf) msg;
+        ByteBufInputStream byteBufInputStream = new ByteBufInputStream(byteBuf);
+        DataInputStream in = new DataInputStream(byteBufInputStream);
+        while (true) {
+          try {
+            if (in.available() <= 0)
+              break;
+            // try to read an id
+            int id = in.readInt();
+
+            if (LOG.isDebugEnabled())
+              LOG.debug(serverAddress.getHostName() + " got value #" + id);
+
+            Call call = calls.get(id);
+
+            // read call status
+            int state = in.readInt();
+            if (state == Status.SUCCESS.state) {
+              Writable value = ReflectionUtils.newInstance(valueClass, conf);
+              value.readFields(in); // read value
+              call.setValue(value);
+              calls.remove(id);
+            } else if (state == Status.ERROR.state) {
+              String className = WritableUtils.readString(in);
+              byte[] errorBytes = new byte[in.available()];
+              in.readFully(errorBytes);
+              call.setException(new RemoteException(className, new String(
+                  errorBytes)));
+              calls.remove(id);
+            } else if (state == Status.FATAL.state) {
+              // Close the connection
+              markClosed(new RemoteException(WritableUtils.readString(in),
+                  WritableUtils.readString(in)));
+            } else {
+              byte[] garbageBytes = new byte[in.available()];
+              in.readFully(garbageBytes);
+            }
+          } catch (IOException e) {
+            markClosed(e);
+          }
+        }
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(byteBufInputStream);
+        ReferenceCountUtil.release(msg);
+      }
+
+      /**
+       * Ths event handler method is called with a Throwable due to an I/O
+       * error. Then, exception is logged and its associated channel is closed
+       * here
+       * 
+       * @param ctx
+       * @param cause
+       */
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        LOG.error("Occured I/O Error : " + cause.getMessage());
+        ctx.close();
+      }
+
+      /**
+       * this method is triggered after a long reading/writing/idle time, it is
+       * marked as to be closed, or the client is marked as not running.
+       * 
+       * @param ctx
+       * @param evt
+       */
+      @Override
+      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+        if (evt instanceof IdleStateEvent) {
+          IdleStateEvent e = (IdleStateEvent) evt;
+          if (e.state() != IdleState.ALL_IDLE) {
+            if (!calls.isEmpty() && !shouldCloseConnection.get()
+                && running.get()) {
+              return;
+            } else if (shouldCloseConnection.get()) {
+              markClosed(null);
+            } else if (calls.isEmpty()) { // idle connection closed or stopped
+              markClosed(null);
+            } else { // get stopped but there are still pending requests
+              markClosed((IOException) new IOException()
+                  .initCause(new InterruptedException()));
+            }
+            closeConnection();
+          }
+        }
+
+      }
+    }
+
+    /**
+     * Handle connection failures If the current number of retries is equal to
+     * the max number of retries, stop retrying and throw the exception;
+     * Otherwise backoff 1 second and try connecting again. This Method is only
+     * called from inside setupIOstreams(), which is synchronized. Hence the
+     * sleep is synchronized; the locks will be retained.
+     * 
+     * @param curRetries current number of retries
+     * @param maxRetries max number of retries allowed
+     * @param ioe failure reason
+     * @throws IOException if max number of retries is reached
+     */
+    @SuppressWarnings("unused")
+    private void handleConnectionFailure(int curRetries, int maxRetries,
+        IOException ioe) throws IOException {
+
+      closeConnection();
+
+      // throw the exception if the maximum number of retries is reached
+      if (curRetries >= maxRetries) {
+        throw ioe;
+      }
+
+      // otherwise back off and retry
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {
+      }
+
+      LOG.info("Retrying connect to server: " + serverAddress
+          + ". Already tried " + curRetries + " time(s); maxRetries="
+          + maxRetries);
+    }
+
+    /*
+     * Handle connection failures If the current number of retries, stop
+     * retrying and throw the exception; Otherwise backoff 1 second and try
+     * connecting again. This Method is only called from inside
+     * setupIOstreams(), which is synchronized. Hence the sleep is synchronized;
+     * the locks will be retained.
+     * @param curRetries current number of retries
+     * @param ioe failure reason
+     * @throws Exception if max number of retries is reached
+     */
+    private void handleConnectionFailure(int curRetries, Exception ioe)
+        throws Exception {
+      closeConnection();
+
+      final boolean retry;
+      try {
+        retry = connectionRetryPolicy.shouldRetry(ioe, curRetries);
+      } catch (Exception e) {
+        throw e instanceof IOException ? (IOException) e : new IOException(e);
+      }
+      if (!retry) {
+        throw ioe;
+      }
+
+      LOG.info("Retrying connect to server: " + serverAddress
+          + ". Already tried " + curRetries + " time(s); retry policy is "
+          + connectionRetryPolicy);
+    }
+
+    /**
+     * Return the remote address of server
+     * 
+     * @return remote server address
+     */
+    public InetSocketAddress getRemoteAddress() {
+      return serverAddress;
+    }
+
+    /**
+     * Initiates a call by sending the parameter to the remote server.
+     * 
+     * @param sendCall
+     */
+    public void sendParam(Call sendCall) {
+      if (LOG.isDebugEnabled())
+        LOG.debug(this.getClass().getName() + " sending #" + sendCall.id);
+      DataOutputBuffer buff = null;
+      try {
+        buff = new DataOutputBuffer();
+        buff.writeInt(sendCall.id);
+        sendCall.param.write(buff);
+        byte[] data = buff.getData();
+        int dataLength = buff.getLength();
+        ByteBuf buf = channel.alloc().buffer();
+
+        buf.writeInt(dataLength);
+        buf.writeBytes(data, 0, dataLength);
+        ChannelFuture channelFuture = channel.writeAndFlush(buf);
+        if (channelFuture.cause() != null) {
+          throw channelFuture.cause();
+        }
+      } catch (IOException ioe) {
+        markClosed(ioe);
+      } catch (Throwable t) {
+        markClosed(new IOException(t));
+      } finally {
+        // the buffer is just an in-memory buffer, but it is still
+        // polite to close early
+        IOUtils.closeStream(buff);
+      }
+    }
+
+    /**
+     * Mark the connection to be closed
+     * 
+     * @param ioe
+     **/
+    private synchronized void markClosed(IOException ioe) {
+      if (shouldCloseConnection.compareAndSet(false, true)) {
+        closeException = ioe;
+        notifyAll();
+      }
+    }
+
+    /** Close the connection. */
+    private synchronized void close() {
+      if (!shouldCloseConnection.get()) {
+        LOG.error("The connection is not in the closed state");
+        return;
+      }
+
+      // release the resources
+      // first thing to do;take the connection out of the connection list
+      synchronized (connections) {
+        if (connections.get(remoteId) == this) {
+          Connection connection = connections.remove(remoteId);
+          connection.closeConnection();
+        }
+      }
+
+      // clean up all calls
+      if (closeException == null) {
+        if (!calls.isEmpty()) {
+          LOG.warn("A connection is closed for no cause and calls are not empty");
+
+          // clean up calls anyway
+          closeException = new IOException("Unexpected closed connection");
+          cleanupCalls();
+        }
+      } else {
+        // log the info
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing ipc connection to " + serverAddress + ": "
+              + closeException.getMessage(), closeException);
+        }
+
+        // cleanup calls
+        cleanupCalls();
+      }
+      if (LOG.isDebugEnabled())
+        LOG.debug(serverAddress.getHostName() + ": closed");
+    }
+
+    /** Cleanup all calls and mark them as done */
+    private void cleanupCalls() {
+      Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
+      while (itor.hasNext()) {
+        Call c = itor.next().getValue();
+        c.setException(closeException); // local exception
+        itor.remove();
+      }
+    }
+  }
+
+  /** A call waiting for a value. */
+  private class Call {
+    int id; // call id
+    Writable param; // parameter
+    Writable value; // value, null if error
+    IOException error; // exception, null if value
+    boolean done; // true when call is done
+
+    protected Call(Writable param) {
+      this.param = param;
+      synchronized (AsyncClient.this) {
+        this.id = counter++;
+      }
+    }
+
+    /**
+     * Indicate when the call is complete and the value or error are available.
+     * Notifies by default.
+     */
+    protected synchronized void callComplete() {
+      this.done = true;
+      notify(); // notify caller
+    }
+
+    /**
+     * Set the exception when there is an error. Notify the caller the call is
+     * done.
+     * 
+     * @param error exception thrown by the call; either local or remote
+     */
+    public synchronized void setException(IOException error) {
+      this.error = error;
+      this.callComplete();
+    }
+
+    /**
+     * Set the return value when there is no error. Notify the caller the call
+     * is done.
+     * 
+     * @param value return value of the call.
+     */
+    public synchronized void setValue(Writable value) {
+      this.value = value;
+      callComplete();
+    }
+  }
+
+  /** Call implementation used for parallel calls. */
+  private class ParallelCall extends Call {
+    private ParallelResults results;
+    private int index;
+
+    public ParallelCall(Writable param, ParallelResults results, int index) {
+      super(param);
+      this.results = results;
+      this.index = index;
+    }
+
+    @Override
+    /** Deliver result to result collector. */
+    protected void callComplete() {
+      results.callComplete(this);
+    }
+  }
+
+  /** Result collector for parallel calls. */
+  private static class ParallelResults {
+    private Writable[] values;
+    private int size;
+    private int count;
+
+    public ParallelResults(int size) {
+      this.values = new Writable[size];
+      this.size = size;
+    }
+
+    /**
+     * Collect a result.
+     * 
+     * @param call
+     */
+    public synchronized void callComplete(ParallelCall call) {
+      values[call.index] = call.value; // store the value
+      count++; // count it
+      if (count == size) // if all values are in
+        notify(); // then notify waiting caller
+    }
+  }
+
+  /**
+   * Construct an IPC client whose values are of the given {@link Writable}
+   * class.
+   * 
+   * @param valueClass
+   * @param conf
+   * @param factory
+   */
+  public AsyncClient(Class<? extends Writable> valueClass, Configuration conf,
+      SocketFactory factory) {
+    this.valueClass = valueClass;
+    this.conf = conf;
+    // SocketFactory only use in order to meet the consistency with other
+    // clients
+    this.socketFactory = factory;
+  }
+
+  /**
+   * Construct an IPC client with the default SocketFactory
+   * 
+   * @param valueClass
+   * @param conf
+   */
+  public AsyncClient(Class<? extends Writable> valueClass, Configuration conf) {
+    // SocketFactory only use in order to meet the consistency with other
+    // clients
+    this(valueClass, conf, BSPNetUtils.getDefaultSocketFactory(conf));
+  }
+
+  /**
+   * Return the socket factory of this client
+   * 
+   * @return this client's socket factory
+   */
+  SocketFactory getSocketFactory() {
+    // SocketFactory only use in order to meet the consistency with other
+    // clients
+    return socketFactory;
+  }
+
+  /**
+   * Stop all threads related to this client. No further calls may be made using
+   * this client.
+   */
+  public void stop() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Stopping client");
+    }
+
+    if (!running.compareAndSet(true, false)) {
+      return;
+    }
+
+    // wake up all connections
+    synchronized (connections) {
+      for (Connection conn : connections.values()) {
+        conn.closeConnection();
+      }
+    }
+  }
+
+  /**
+   * Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code> which is servicing the <code>protocol</code> protocol,
+   * with the <code>ticket</code> credentials, <code>rpcTimeout</code> as
+   * timeout and <code>conf</code> as configuration for this connection,
+   * returning the value. Throws exceptions if there are network problems or if
+   * the remote code threw an exception.
+   * 
+   * @param param
+   * @param addr
+   * @param protocol
+   * @param ticket
+   * @param rpcTimeout
+   * @param conf
+   * @return Response Writable value
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public Writable call(Writable param, InetSocketAddress addr,
+      Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+      Configuration conf) throws InterruptedException, IOException {
+    ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+        ticket, rpcTimeout, conf);
+    return call(param, remoteId);
+  }
+
+  /**
+   * Make a call, passing <code>param</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the value. Throws exceptions if there are
+   * network problems or if the remote code threw an exception.
+   * 
+   * @param param
+   * @param remoteId
+   * @return Response Writable value
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public Writable call(Writable param, ConnectionId remoteId)
+      throws InterruptedException, IOException {
+    Call call = new Call(param);
+
+    Connection connection = getConnection(remoteId, call);
+
+    connection.sendParam(call); // send the parameter
+    boolean interrupted = false;
+
+    synchronized (call) {
+      int callFailCount = 0;
+      while (!call.done) {
+        try {
+          call.wait(1000); // wait for the result
+          // prevent client hang from response error
+          if (callFailCount++ == IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT)
+            break;
+        } catch (InterruptedException ie) {
+          interrupted = true;
+        }
+      }
+
+      if (interrupted) {
+        // set the interrupt flag now that we are done waiting
+        Thread.currentThread().interrupt();
+
+      }
+
+      if (call.error != null) {
+        if (call.error instanceof RemoteException) {
+          call.error.fillInStackTrace();
+          throw call.error;
+        } else { // local exception
+          // use the connection because it will reflect an ip change,
+          // unlike
+          // the remoteId
+          throw wrapException(connection.getRemoteAddress(), call.error);
+        }
+      } else {
+        return call.value;
+      }
+    }
+  }
+
+  /**
+   * Take an IOException and the address we were trying to connect to and return
+   * an IOException with the input exception as the cause. The new exception
+   * provides the stack trace of the place where the exception is thrown and
+   * some extra diagnostics information. If the exception is ConnectException or
+   * SocketTimeoutException, return a new one of the same type; Otherwise return
+   * an IOException.
+   * 
+   * @param addr target address
+   * @param exception the relevant exception
+   * @return an exception to throw
+   */
+  private IOException wrapException(InetSocketAddress addr,
+      IOException exception) {
+    if (exception instanceof ConnectException) {
+      // connection refused; include the host:port in the error
+      return (ConnectException) new ConnectException("Call to " + addr
+          + " failed on connection exception: " + exception)
+          .initCause(exception);
+    } else if (exception instanceof SocketTimeoutException) {
+      return (SocketTimeoutException) new SocketTimeoutException("Call to "
+          + addr + " failed on socket timeout exception: " + exception)
+          .initCause(exception);
+    } else {
+      return (IOException) new IOException("Call to " + addr
+          + " failed on local exception: " + exception).initCause(exception);
+
+    }
+  }
+
+  /**
+   * Makes a set of calls in parallel. Each parameter is sent to the
+   * corresponding address. When all values are available, or have timed out or
+   * errored, the collected results are returned in an array. The array contains
+   * nulls for calls that timed out or errored.
+   * 
+   * @param params
+   * @param addresses
+   * @param protocol
+   * @param ticket
+   * @param conf
+   * @return Response Writable value array
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
+      Class<?> protocol, UserGroupInformation ticket, Configuration conf)
+      throws IOException, InterruptedException {
+    if (addresses.length == 0)
+      return new Writable[0];
+
+    ParallelResults results = new ParallelResults(params.length);
+    ConnectionId remoteId[] = new ConnectionId[addresses.length];
+    synchronized (results) {
+      for (int i = 0; i < params.length; i++) {
+        ParallelCall call = new ParallelCall(params[i], results, i);
+        try {
+          remoteId[i] = ConnectionId.getConnectionId(addresses[i], protocol,
+              ticket, 0, conf);
+          Connection connection = getConnection(remoteId[i], call);
+          connection.sendParam(call); // send each parameter
+        } catch (IOException e) {
+          // log errors
+          LOG.info("Calling " + addresses[i] + " caught: " + e.getMessage(), e);
+          results.size--; // wait for one fewer result
+        }
+      }
+
+      while (results.count != results.size) {
+        try {
+          results.wait(); // wait for all results
+        } catch (InterruptedException e) {
+        }
+      }
+
+      return results.values;
+    }
+  }
+
+  // for unit testing only
+  Set<ConnectionId> getConnectionIds() {
+    synchronized (connections) {
+      return connections.keySet();
+    }
+  }
+
+  /**
+   * Get a connection from the pool, or create a new one and add it to the pool.
+   * Connections to a given ConnectionId are reused.
+   * 
+   * @param remoteId
+   * @param call
+   * @return connection
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private synchronized Connection getConnection(ConnectionId remoteId, Call call)
+      throws IOException, InterruptedException {
+    if (!running.get()) {
+      // the client is stopped
+      throw new IOException("The client is stopped");
+    }
+    Connection connection;
+    /*
+     * we could avoid this allocation for each RPC by having a connectionsId
+     * object and with set() method. We need to manage the refs for keys in
+     * HashMap properly. For now its ok.
+     */
+    do {
+      connection = connections.get(remoteId);
+      if (connection == null) {
+        connection = new Connection(remoteId);
+        connections.put(remoteId, connection);
+      } else if (!connection.channel.isWritable()
+          || !connection.channel.isActive()) {
+        connection = new Connection(remoteId);
+        connections.remove(remoteId);
+        connections.put(remoteId, connection);
+      }
+    } while (!connection.addCall(call));
+    // we don't invoke the method below inside "synchronized (connections)"
+    // block above. The reason for that is if the server happens to be slow,
+    // it will take longer to establish a connection and that will slow the
+    // entire system down.
+
+    connection.setupIOstreams();
+    return connection;
+  }
+
+  /**
+   * This class holds the address and the user ticket. The client connections to
+   * servers are uniquely identified by <remoteAddress, protocol, ticket>
+   */
+  static class ConnectionId {
+    InetSocketAddress address;
+    UserGroupInformation ticket;
+    Class<?> protocol;
+    private static final int PRIME = 16777619;
+    private int rpcTimeout;
+    private String serverPrincipal;
+    private int maxIdleTime; // connections will be culled if it was idle for
+                             // maxIdleTime msecs
+    private final RetryPolicy connectionRetryPolicy;
+    private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+    private int pingInterval; // how often sends ping to the server in msecs
+
+    ConnectionId(InetSocketAddress address, Class<?> protocol,
+        UserGroupInformation ticket, int rpcTimeout, String serverPrincipal,
+        int maxIdleTime, RetryPolicy connectionRetryPolicy, boolean tcpNoDelay,
+        int pingInterval) {
+      this.protocol = protocol;
+      this.address = address;
+      this.ticket = ticket;
+      this.rpcTimeout = rpcTimeout;
+      this.serverPrincipal = serverPrincipal;
+      this.maxIdleTime = maxIdleTime;
+      this.connectionRetryPolicy = connectionRetryPolicy;
+      this.tcpNoDelay = tcpNoDelay;
+      this.pingInterval = pingInterval;
+    }
+
+    InetSocketAddress getAddress() {
+      return address;
+    }
+
+    Class<?> getProtocol() {
+      return protocol;
+    }
+
+    private int getRpcTimeout() {
+      return rpcTimeout;
+    }
+
+    String getServerPrincipal() {
+      return serverPrincipal;
+    }
+
+    int getMaxIdleTime() {
+      return maxIdleTime;
+    }
+
+    boolean getTcpNoDelay() {
+      return tcpNoDelay;
+    }
+
+    int getPingInterval() {
+      return pingInterval;
+    }
+
+    static ConnectionId getConnectionId(InetSocketAddress addr,
+        Class<?> protocol, UserGroupInformation ticket, Configuration conf)
+        throws IOException {
+      return getConnectionId(addr, protocol, ticket, 0, conf);
+    }
+
+    static ConnectionId getConnectionId(InetSocketAddress addr,
+        Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+        Configuration conf) throws IOException {
+      return getConnectionId(addr, protocol, ticket, rpcTimeout, null, conf);
+    }
+
+    static ConnectionId getConnectionId(InetSocketAddress addr,
+        Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
+        RetryPolicy connectionRetryPolicy, Configuration conf)
+        throws IOException {
+
+      if (connectionRetryPolicy == null) {
+        final int max = conf.getInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+            IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT);
+        connectionRetryPolicy = RetryPolicies
+            .retryUpToMaximumCountWithFixedSleep(max, 1, TimeUnit.SECONDS);
+      }
+
+      return new ConnectionId(addr, protocol, ticket, rpcTimeout,
+          null,
+          conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
+          connectionRetryPolicy,
+          conf.getBoolean("ipc.client.tcpnodelay", true),
+          AsyncClient.getPingInterval(conf));
+    }
+
+    static boolean isEqual(Object a, Object b) {
+      return a == null ? b == null : a.equals(b);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (obj instanceof ConnectionId) {
+        ConnectionId that = (ConnectionId) obj;
+        return isEqual(this.address, that.address)
+            && this.maxIdleTime == that.maxIdleTime
+            && isEqual(this.connectionRetryPolicy, that.connectionRetryPolicy)
+            && this.pingInterval == that.pingInterval
+            && isEqual(this.protocol, that.protocol)
+            && this.rpcTimeout == that.rpcTimeout
+            && isEqual(this.serverPrincipal, that.serverPrincipal)
+            && this.tcpNoDelay == that.tcpNoDelay
+            && isEqual(this.ticket, that.ticket);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = connectionRetryPolicy.hashCode();
+      result = PRIME * result + ((address == null) ? 0 : address.hashCode());
+      result = PRIME * result + maxIdleTime;
+      result = PRIME * result + pingInterval;
+      result = PRIME * result + ((protocol == null) ? 0 : protocol.hashCode());
+      result = PRIME * rpcTimeout;
+      result = PRIME * result
+          + ((serverPrincipal == null) ? 0 : serverPrincipal.hashCode());
+      result = PRIME * result + (tcpNoDelay ? 1231 : 1237);
+      result = PRIME * result + ((ticket == null) ? 0 : ticket.hashCode());
+      return result;
+    }
+  }
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncClient.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java?rev=1628344&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java Tue Sep 30 00:44:14 2014
@@ -0,0 +1,777 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hama.util.BSPNetUtils;
+
+/**
+ * A simple RPC mechanism using netty.
+ * 
+ * A <i>protocol</i> is a Java interface. All parameters and return types must
+ * be one of:
+ * 
+ * <ul>
+ * <li>a primitive type, <code>boolean</code>, <code>byte</code>,
+ * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
+ * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
+ * 
+ * <li>a {@link String}; or</li>
+ * 
+ * <li>a {@link Writable}; or</li>
+ * 
+ * <li>an array of the above types</li>
+ * </ul>
+ * 
+ * All methods in the protocol should throw only IOException. No field data of
+ * the protocol instance is transmitted.
+ */
+public class AsyncRPC {
+  private static final Log LOG = LogFactory.getLog(AsyncRPC.class);
+
+  private AsyncRPC() {
+  } // no public ctor
+
+  /** A method invocation, including the method name and its parameters. */
+  @SuppressWarnings("rawtypes")
+  private static class Invocation implements Writable, Configurable {
+    private String methodName;
+    private Class[] parameterClasses;
+    private Object[] parameters;
+    private Configuration conf;
+
+    @SuppressWarnings("unused")
+    public Invocation() {
+    }
+
+    /**
+     * 
+     * @param method
+     * @param parameters
+     */
+    public Invocation(Method method, Object[] parameters) {
+      this.methodName = method.getName();
+      this.parameterClasses = method.getParameterTypes();
+      this.parameters = parameters;
+    }
+
+    /** The name of the method invoked. */
+    public String getMethodName() {
+      return methodName;
+    }
+
+    /** The parameter classes. */
+    public Class[] getParameterClasses() {
+      return parameterClasses;
+    }
+
+    /** The parameter instances. */
+    public Object[] getParameters() {
+      return parameters;
+    }
+
+    /**
+     * 
+     * @param in
+     */
+    public void readFields(DataInput in) throws IOException {
+      methodName = Text.readString(in);
+      parameters = new Object[in.readInt()];
+      parameterClasses = new Class[parameters.length];
+      ObjectWritable objectWritable = new ObjectWritable();
+      for (int i = 0; i < parameters.length; i++) {
+        parameters[i] = ObjectWritable
+            .readObject(in, objectWritable, this.conf);
+        parameterClasses[i] = objectWritable.getDeclaredClass();
+      }
+    }
+
+    /**
+     * 
+     * @param out
+     */
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, methodName);
+      out.writeInt(parameterClasses.length);
+      for (int i = 0; i < parameterClasses.length; i++) {
+        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+            conf);
+      }
+    }
+
+    public String toString() {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(methodName);
+      buffer.append("(");
+      for (int i = 0; i < parameters.length; i++) {
+        if (i != 0)
+          buffer.append(", ");
+        buffer.append(parameters[i]);
+      }
+      buffer.append(")");
+      return buffer.toString();
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+    public Configuration getConf() {
+      return this.conf;
+    }
+
+  }
+
+  /** Cache a client using its socket factory as the hash key */
+  static private class ClientCache {
+    private Map<SocketFactory, AsyncClient> clients = new HashMap<SocketFactory, AsyncClient>();
+
+    /**
+     * Construct & cache an IPC client with the user-provided SocketFactory if
+     * no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized AsyncClient getClient(Configuration conf,
+        SocketFactory factory) {
+      // Construct & cache client. The configuration is only used for timeout,
+      // and Clients have connection pools. So we can either (a) lose some
+      // connection pooling and leak sockets, or (b) use the same timeout for
+      // all
+      // configurations. Since the IPC is usually intended globally, not
+      // per-job, we choose (a).
+      AsyncClient client = clients.get(factory);
+      if (client == null) {
+        client = new AsyncClient(ObjectWritable.class, conf, factory);
+        clients.put(factory, client);
+      } else {
+        client.incCount();
+      }
+      return client;
+    }
+
+    /**
+     * Construct & cache an IPC client with the default SocketFactory if no
+     * cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized AsyncClient getClient(Configuration conf) {
+      return getClient(conf, SocketFactory.getDefault());
+    }
+
+    /**
+     * Stop a RPCWithNetty client connection A RPCWithNetty client is closed
+     * only when its reference count becomes zero.
+     * 
+     * @param client
+     */
+    private void stopClient(AsyncClient client) {
+      synchronized (this) {
+        client.decCount();
+        if (client.isZeroReference()) {
+          clients.remove(client.getSocketFactory());
+        }
+      }
+      if (client.isZeroReference()) {
+        client.stop();
+      }
+    }
+  }
+
+  private static ClientCache CLIENTS = new ClientCache();
+
+  /**
+   * for unit testing only
+   * 
+   * @param conf
+   * @return
+   */
+  static AsyncClient getClient(Configuration conf) {
+    return CLIENTS.getClient(conf);
+  }
+
+  /**
+   * 
+   */
+  private static class Invoker implements InvocationHandler {
+    private AsyncClient.ConnectionId remoteId;
+    private AsyncClient client;
+    private boolean isClosed = false;
+
+    private Invoker(Class<? extends VersionedProtocol> protocol,
+        InetSocketAddress address, UserGroupInformation ticket,
+        Configuration conf, SocketFactory factory, int rpcTimeout,
+        RetryPolicy connectionRetryPolicy) throws IOException {
+      this.remoteId = AsyncClient.ConnectionId.getConnectionId(address,
+          protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
+      this.client = CLIENTS.getClient(conf, factory);
+    }
+
+    public Object invoke(Object proxy, Method method, Object[] args)
+        throws Throwable {
+      final boolean logDebug = LOG.isDebugEnabled();
+      long startTime = 0;
+      if (logDebug) {
+        startTime = System.currentTimeMillis();
+      }
+
+      ObjectWritable value = (ObjectWritable) client.call(new Invocation(
+          method, args), remoteId);
+      if (logDebug) {
+        long callTime = System.currentTimeMillis() - startTime;
+        LOG.debug("Call: " + method.getName() + " " + callTime);
+      }
+      return value.get();
+    }
+
+    /** close the RPCWithNetty client that's responsible for this invoker's RPCs */
+    synchronized private void close() {
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
+      }
+    }
+  }
+
+  /**
+   * A version mismatch for the RPC protocol.
+   */
+  @SuppressWarnings("serial")
+  public static class VersionMismatch extends IOException {
+    private String interfaceName;
+    private long clientVersion;
+    private long serverVersion;
+
+    /**
+     * Create a version mismatch exception
+     * 
+     * @param interfaceName the name of the protocol mismatch
+     * @param clientVersion the client's version of the protocol
+     * @param serverVersion the server's version of the protocol
+     */
+    public VersionMismatch(String interfaceName, long clientVersion,
+        long serverVersion) {
+      super("Protocol " + interfaceName + " version mismatch. (client = "
+          + clientVersion + ", server = " + serverVersion + ")");
+      this.interfaceName = interfaceName;
+      this.clientVersion = clientVersion;
+      this.serverVersion = serverVersion;
+    }
+
+    /**
+     * Get the interface name
+     * 
+     * @return the java class name (eg.
+     *         org.apache.hadoop.mapred.InterTrackerProtocol)
+     */
+    public String getInterfaceName() {
+      return interfaceName;
+    }
+
+    /**
+     * Get the client's preferred version
+     */
+    public long getClientVersion() {
+      return clientVersion;
+    }
+
+    /**
+     * Get the server's agreed to version.
+     */
+    public long getServerVersion() {
+      return serverVersion;
+    }
+  }
+
+  /**
+   * Get a proxy connection to a remote server
+   */
+  public static VersionedProtocol waitForProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    return waitForProxy(protocol, clientVersion, addr, conf, 0, Long.MAX_VALUE);
+  }
+
+  /**
+   * Get a proxy connection to a remote server
+   * 
+   * @param protocol protocol class
+   * @param clientVersion client version
+   * @param addr remote address
+   * @param conf configuration to use
+   * @param connTimeout time in milliseconds before giving up
+   * @return the proxy
+   * @throws IOException if the far end through a RemoteException
+   */
+  static VersionedProtocol waitForProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf, long connTimeout)
+      throws IOException {
+    return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout);
+  }
+
+  /**
+   * Get a proxy connection to a remote server
+   * 
+   * @param protocol protocol class
+   * @param clientVersion client version
+   * @param addr remote address
+   * @param conf configuration to use
+   * @param rpcTimeout rpc timeout
+   * @param connTimeout time in milliseconds before giving up
+   * @return the proxy
+   * @throws IOException if the far end through a RemoteException
+   */
+  static VersionedProtocol waitForProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf, int rpcTimeout,
+      long connTimeout) throws IOException {
+    long startTime = System.currentTimeMillis();
+    IOException ioe;
+    while (true) {
+      try {
+        return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
+      } catch (ConnectException se) { // namenode has not been started
+        LOG.info("Server at " + addr + " not available yet, Zzzzz...");
+        ioe = se;
+      } catch (SocketTimeoutException te) { // namenode is busy
+        LOG.info("Problem connecting to server: " + addr);
+        ioe = te;
+      }
+      // check if timed out
+      if (System.currentTimeMillis() - connTimeout >= startTime) {
+        throw ioe;
+      }
+
+      // wait for retry
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        // IGNORE
+      }
+    }
+  }
+
+  /**
+   * Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address.
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @param factory
+   * @return the proxy
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf, SocketFactory factory)
+      throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    return getProxy(protocol, clientVersion, addr, ugi, conf, factory, 0);
+  }
+
+  /**
+   * Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address.
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @param factory
+   * @param rpcTimeout
+   * @return the proxy
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf, SocketFactory factory,
+      int rpcTimeout) throws IOException {
+    return getProxy(protocol, clientVersion, addr, null, conf, factory,
+        rpcTimeout);
+  }
+
+  /**
+   * Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address.
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param ticket
+   * @param conf
+   * @param factory
+   * @return the proxy
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0);
+  }
+
+  /**
+   * Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address.
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param ticket
+   * @param conf
+   * @param factory
+   * @param rpcTimeout
+   * @return the proxy
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+        rpcTimeout, null, true);
+  }
+
+  /**
+   * Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address.
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param ticket
+   * @param conf
+   * @param factory
+   * @param rpcTimeout
+   * @param connectionRetryPolicy
+   * @param checkVersion
+   * @return the proxy
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
+      boolean checkVersion) throws IOException {
+
+    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
+        rpcTimeout, connectionRetryPolicy);
+    VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
+        protocol.getClassLoader(), new Class[] { protocol }, invoker);
+
+    if (checkVersion) {
+      checkVersion(protocol, clientVersion, proxy);
+    }
+    return proxy;
+  }
+
+  /**
+   * Get server version and then compare it with client version.
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param proxy
+   * @throws IOException
+   */
+  public static void checkVersion(Class<? extends VersionedProtocol> protocol,
+      long clientVersion, VersionedProtocol proxy) throws IOException {
+    long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+        clientVersion);
+    if (serverVersion != clientVersion) {
+      throw new VersionMismatch(protocol.getName(), clientVersion,
+          serverVersion);
+    }
+  }
+
+  /**
+   * Construct a client-side proxy object with the default SocketFactory
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @return a proxy instance
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    return getProxy(protocol, clientVersion, addr, conf,
+        BSPNetUtils.getDefaultSocketFactory(conf), 0);
+  }
+
+  /**
+   * Get VersionedProtocol
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @param rpcTimeout
+   * @return the proxy
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(
+      Class<? extends VersionedProtocol> protocol, long clientVersion,
+      InetSocketAddress addr, Configuration conf, int rpcTimeout)
+      throws IOException {
+
+    return getProxy(protocol, clientVersion, addr, conf,
+        BSPNetUtils.getDefaultSocketFactory(conf), rpcTimeout);
+  }
+
+  /**
+   * Stop this proxy and release its invoker's resource
+   * 
+   * @param proxy the proxy to be stopped
+   */
+  public static void stopProxy(VersionedProtocol proxy) {
+    if (proxy != null) {
+      ((Invoker) Proxy.getInvocationHandler(proxy)).close();
+    }
+  }
+
+  /**
+   * Expert: Make multiple, parallel calls to a set of servers.
+   * 
+   * @param method
+   * @param params
+   * @param addrs
+   * @param ticket
+   * @param conf
+   * @return response object array
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public static Object[] call(Method method, Object[][] params,
+      InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf)
+      throws IOException, InterruptedException {
+
+    Invocation[] invocations = new Invocation[params.length];
+    for (int i = 0; i < params.length; i++)
+      invocations[i] = new Invocation(method, params[i]);
+    AsyncClient client = CLIENTS.getClient(conf);
+    try {
+      Writable[] wrappedValues = client.call(invocations, addrs,
+          method.getDeclaringClass(), ticket, conf);
+
+      if (method.getReturnType() == Void.TYPE) {
+        return null;
+      }
+
+      Object[] values = (Object[]) Array.newInstance(method.getReturnType(),
+          wrappedValues.length);
+      for (int i = 0; i < values.length; i++)
+        if (wrappedValues[i] != null)
+          values[i] = ((ObjectWritable) wrappedValues[i]).get();
+
+      return values;
+    } finally {
+      CLIENTS.stopClient(client);
+    }
+  }
+
+  /**
+   * Construct a server for a protocol implementation instance listening on a
+   * port and address.
+   * 
+   * @param instance
+   * @param bindAddress
+   * @param port
+   * @param conf
+   * @return server
+   * @throws IOException
+   */
+  public static NioServer getServer(final Object instance,
+      final String bindAddress, final int port, Configuration conf)
+      throws IOException {
+    return getServer(instance, bindAddress, port, 1, false, conf);
+  }
+
+  /**
+   * Construct a server for a protocol implementation instance listening on a
+   * port and address.
+   * 
+   * @param instance
+   * @param bindAddress
+   * @param port
+   * @param numHandlers
+   * @param verbose
+   * @param conf
+   * @return server
+   * @throws IOException
+   */
+  public static NioServer getServer(final Object instance,
+      final String bindAddress, final int port, final int numHandlers,
+      final boolean verbose, Configuration conf) throws IOException {
+    return getServer(instance, bindAddress, port, numHandlers, verbose, conf,
+        null);
+  }
+
+  /**
+   * Construct a server for a protocol implementation instance listening on a
+   * port and address, with a secret manager.
+   * 
+   * @param instance
+   * @param bindAddress
+   * @param port
+   * @param numHandlers
+   * @param verbose
+   * @param conf
+   * @param secretManager
+   * @return server
+   * @throws IOException
+   */
+  public static NioServer getServer(final Object instance,
+      final String bindAddress, final int port, final int numHandlers,
+      final boolean verbose, Configuration conf,
+      SecretManager<? extends TokenIdentifier> secretManager)
+      throws IOException {
+    return new NioServer(instance, conf, bindAddress, port, numHandlers,
+        verbose, secretManager);
+  }
+
+  /** An RPC Server. */
+  public static class NioServer extends org.apache.hama.ipc.AsyncServer {
+    private Object instance;
+    private boolean verbose;
+
+    /**
+     * Construct an RPC server.
+     * 
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @throws IOException
+     */
+    public NioServer(Object instance, Configuration conf, String bindAddress,
+        int port) throws IOException {
+      this(instance, conf, bindAddress, port, 1, false, null);
+    }
+
+    private static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length - 1];
+    }
+
+    /**
+     * Construct an RPC server.
+     * 
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * @throws IOException
+     */
+    public NioServer(Object instance, Configuration conf, String bindAddress,
+        int port, int numHandlers, boolean verbose,
+        SecretManager<? extends TokenIdentifier> secretManager)
+        throws IOException {
+      super(bindAddress, port, Invocation.class, numHandlers, conf,
+          classNameBase(instance.getClass().getName()), secretManager);
+      this.instance = instance;
+      this.verbose = verbose;
+    }
+
+    public Writable call(Class<?> protocol, Writable param, long receivedTime)
+        throws IOException {
+      try {
+        Invocation call = (Invocation) param;
+        if (verbose)
+          log("Call: " + call);
+
+        Method method = protocol.getMethod(call.getMethodName(),
+            call.getParameterClasses());
+        method.setAccessible(true);
+
+        long startTime = System.currentTimeMillis();
+        Object value = method.invoke(instance, call.getParameters());
+        int processingTime = (int) (System.currentTimeMillis() - startTime);
+        int qTime = (int) (startTime - receivedTime);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Served: " + call.getMethodName() + " queueTime= " + qTime
+              + " procesingTime= " + processingTime);
+        }
+        if (verbose)
+          log("Return: " + value);
+
+        return new ObjectWritable(method.getReturnType(), value);
+
+      } catch (InvocationTargetException e) {
+        Throwable target = e.getTargetException();
+        if (target instanceof IOException) {
+          throw (IOException) target;
+        } else {
+          IOException ioe = new IOException(target.toString());
+          ioe.setStackTrace(target.getStackTrace());
+          throw ioe;
+        }
+      } catch (Throwable e) {
+        if (!(e instanceof IOException)) {
+          LOG.error("Unexpected throwable object ", e);
+        }
+        IOException ioe = new IOException(e.toString());
+        ioe.setStackTrace(e.getStackTrace());
+        throw ioe;
+      }
+    }
+  }
+
+  private static void log(String value) {
+    if (value != null && value.length() > 55)
+      value = value.substring(0, 55) + "...";
+    LOG.info(value);
+  }
+}

Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hama/trunk/core/src/main/java/org/apache/hama/ipc/AsyncRPC.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain