You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/12/08 02:43:45 UTC

svn commit: r724231 [2/2] - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/master/ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/ipc/

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=724231&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sun Dec  7 17:43:45 2008
@@ -0,0 +1,1061 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/** An abstract IPC service.  IPC calls take a single {@link Writable} as a
+ * parameter, and return a {@link Writable} as their value.  A service runs on
+ * a port and is defined by a parameter class and a value class.
+ * 
+ * 
+ * <p>Copied local so can fix HBASE-900.
+ * 
+ * @see HBaseClient
+ */
+public abstract class HBaseServer {
+  
+  /**
+   * The first four bytes of Hadoop RPC connections
+   */
+  public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+  
+  // 1 : Introduce ping and server does not throw away RPCs
+  public static final byte CURRENT_VERSION = 2;
+  
+  /**
+   * How many calls/handler are allowed in the queue.
+   */
+  private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
+  
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
+
+  private static final ThreadLocal<HBaseServer> SERVER = new ThreadLocal<HBaseServer>();
+
+  /** Returns the server instance called under or null.  May be called under
+   * {@link #call(Writable, long)} implementations, and under {@link Writable}
+   * methods of paramters and return values.  Permits applications to access
+   * the server context.*/
+  public static HBaseServer get() {
+    return SERVER.get();
+  }
+ 
+  /** This is set to Call object before Handler invokes an RPC and reset
+   * after the call returns.
+   */
+  private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+  
+  /** Returns the remote side ip address when invoked inside an RPC 
+   *  Returns null incase of an error.
+   */
+  public static InetAddress getRemoteIp() {
+    Call call = CurCall.get();
+    if (call != null) {
+      return call.connection.socket.getInetAddress();
+    }
+    return null;
+  }
+  /** Returns remote address as a string when invoked inside an RPC.
+   *  Returns null in case of an error.
+   */
+  public static String getRemoteAddress() {
+    InetAddress addr = getRemoteIp();
+    return (addr == null) ? null : addr.getHostAddress();
+  }
+
+  private String bindAddress; 
+  private int port;                               // port we listen on
+  private int handlerCount;                       // number of handler threads
+  private Class<? extends Writable> paramClass;   // class of call parameters
+  private int maxIdleTime;                        // the maximum idle time after 
+                                                  // which a client may be disconnected
+  private int thresholdIdleConnections;           // the number of idle connections
+                                                  // after which we will start
+                                                  // cleaning up idle 
+                                                  // connections
+  int maxConnectionsToNuke;                       // the max number of 
+                                                  // connections to nuke
+                                                  //during a cleanup
+  
+  protected HBaseRpcMetrics  rpcMetrics;
+  
+  private Configuration conf;
+
+  private int maxQueueSize;
+  private int socketSendBufferSize;
+  private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+
+  volatile private boolean running = true;         // true while server runs
+  private BlockingQueue<Call> callQueue; // queued calls
+
+  private List<Connection> connectionList = 
+    Collections.synchronizedList(new LinkedList<Connection>());
+  //maintain a list
+  //of client connections
+  private Listener listener = null;
+  private Responder responder = null;
+  private int numConnections = 0;
+  private Handler[] handlers = null;
+
+  /**
+   * A convenience method to bind to a given address and report 
+   * better exceptions if the address is not a valid host.
+   * @param socket the socket to bind
+   * @param address the address to bind to
+   * @param backlog the number of connections allowed in the queue
+   * @throws BindException if the address can't be bound
+   * @throws UnknownHostException if the address isn't a valid host name
+   * @throws IOException other random errors from bind
+   */
+  public static void bind(ServerSocket socket, InetSocketAddress address, 
+                          int backlog) throws IOException {
+    try {
+      socket.bind(address, backlog);
+    } catch (BindException e) {
+      BindException bindException = new BindException("Problem binding to " + address
+                                                      + " : " + e.getMessage());
+      bindException.initCause(e);
+      throw bindException;
+    } catch (SocketException e) {
+      // If they try to bind to a different host's address, give a better
+      // error message.
+      if ("Unresolved address".equals(e.getMessage())) {
+        throw new UnknownHostException("Invalid hostname for server: " + 
+                                       address.getHostName());
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /** A call queued for handling. */
+  private static class Call {
+    private int id;                               // the client's call id
+    private Writable param;                       // the parameter passed
+    private Connection connection;                // connection to client
+    private long timestamp;     // the time received when response is null
+                                   // the time served when response is not null
+    private ByteBuffer response;                      // the response for this call
+
+    public Call(int id, Writable param, Connection connection) {
+      this.id = id;
+      this.param = param;
+      this.connection = connection;
+      this.timestamp = System.currentTimeMillis();
+      this.response = null;
+    }
+    
+    @Override
+    public String toString() {
+      return param.toString() + " from " + connection.toString();
+    }
+
+    public void setResponse(ByteBuffer response) {
+      this.response = response;
+    }
+  }
+
+  /** Listens on the socket. Creates jobs for the handler threads*/
+  private class Listener extends Thread {
+    
+    private ServerSocketChannel acceptChannel = null; //the accept channel
+    private Selector selector = null; //the selector that we use for the server
+    private InetSocketAddress address; //the address we bind at
+    private Random rand = new Random();
+    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
+                                         //-tion (for idle connections) ran
+    private long cleanupInterval = 10000; //the minimum interval between 
+                                          //two cleanup runs
+    private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
+    
+    public Listener() throws IOException {
+      address = new InetSocketAddress(bindAddress, port);
+      // Create a new server socket and set to non blocking mode
+      acceptChannel = ServerSocketChannel.open();
+      acceptChannel.configureBlocking(false);
+
+      // Bind the server socket to the local host and port
+      bind(acceptChannel.socket(), address, backlogLength);
+      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
+      // create a selector;
+      selector= Selector.open();
+
+      // Register accepts on the server socket with the selector.
+      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
+      this.setName("IPC Server listener on " + port);
+      this.setDaemon(true);
+    }
+    /** cleanup connections from connectionList. Choose a random range
+     * to scan and also have a limit on the number of the connections
+     * that will be cleanedup per run. The criteria for cleanup is the time
+     * for which the connection was idle. If 'force' is true then all 
+     * connections will be looked at for the cleanup.
+     */
+    private void cleanupConnections(boolean force) {
+      if (force || numConnections > thresholdIdleConnections) {
+        long currentTime = System.currentTimeMillis();
+        if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
+          return;
+        }
+        int start = 0;
+        int end = numConnections - 1;
+        if (!force) {
+          start = rand.nextInt() % numConnections;
+          end = rand.nextInt() % numConnections;
+          int temp;
+          if (end < start) {
+            temp = start;
+            start = end;
+            end = temp;
+          }
+        }
+        int i = start;
+        int numNuked = 0;
+        while (i <= end) {
+          Connection c;
+          synchronized (connectionList) {
+            try {
+              c = connectionList.get(i);
+            } catch (Exception e) {return;}
+          }
+          if (c.timedOut(currentTime)) {
+            if (LOG.isDebugEnabled())
+              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+            closeConnection(c);
+            numNuked++;
+            end--;
+            c = null;
+            if (!force && numNuked == maxConnectionsToNuke) break;
+          }
+          else i++;
+        }
+        lastCleanupRunTime = System.currentTimeMillis();
+      }
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(HBaseServer.this);
+      while (running) {
+        SelectionKey key = null;
+        try {
+          selector.select();
+          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
+          while (iter.hasNext()) {
+            key = iter.next();
+            iter.remove();
+            try {
+              if (key.isValid()) {
+                if (key.isAcceptable())
+                  doAccept(key);
+                else if (key.isReadable())
+                  doRead(key);
+              }
+            } catch (IOException e) {
+            }
+            key = null;
+          }
+        } catch (OutOfMemoryError e) {
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give 
+          // some thread(s) a chance to finish
+          LOG.warn("Out of Memory in server select", e);
+          closeCurrentConnection(key, e);
+          cleanupConnections(true);
+          try { Thread.sleep(60000); } catch (Exception ie) {}
+        } catch (InterruptedException e) {
+          if (running) {                          // unexpected -- log it
+            LOG.info(getName() + " caught: " +
+                     StringUtils.stringifyException(e));
+          }
+        } catch (Exception e) {
+          closeCurrentConnection(key, e);
+        }
+        cleanupConnections(false);
+      }
+      LOG.info("Stopping " + this.getName());
+
+      synchronized (this) {
+        try {
+          acceptChannel.close();
+          selector.close();
+        } catch (IOException e) { }
+
+        selector= null;
+        acceptChannel= null;
+        
+        // clean up all connections
+        while (!connectionList.isEmpty()) {
+          closeConnection(connectionList.remove(0));
+        }
+      }
+    }
+
+    private void closeCurrentConnection(SelectionKey key, Throwable e) {
+      if (key != null) {
+        Connection c = (Connection)key.attachment();
+        if (c != null) {
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+          closeConnection(c);
+          c = null;
+        }
+      }
+    }
+
+    InetSocketAddress getAddress() {
+      return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
+    }
+    
+    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
+      Connection c = null;
+      ServerSocketChannel server = (ServerSocketChannel) key.channel();
+      // accept up to 10 connections
+      for (int i=0; i<10; i++) {
+        SocketChannel channel = server.accept();
+        if (channel==null) return;
+
+        channel.configureBlocking(false);
+        channel.socket().setTcpNoDelay(tcpNoDelay);
+        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
+        c = new Connection(readKey, channel, System.currentTimeMillis());
+        readKey.attach(c);
+        synchronized (connectionList) {
+          connectionList.add(numConnections, c);
+          numConnections++;
+        }
+        if (LOG.isDebugEnabled())
+          LOG.debug("Server connection from " + c.toString() +
+              "; # active connections: " + numConnections +
+              "; # queued calls: " + callQueue.size());
+      }
+    }
+
+    void doRead(SelectionKey key) throws InterruptedException {
+      int count = 0;
+      Connection c = (Connection)key.attachment();
+      if (c == null) {
+        return;  
+      }
+      c.setLastContact(System.currentTimeMillis());
+      
+      try {
+        count = c.readAndProcess();
+      } catch (InterruptedException ieo) {
+        throw ieo;
+      } catch (Exception e) {
+        LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
+        count = -1; //so that the (count < 0) block is executed
+      }
+      if (count < 0) {
+        if (LOG.isDebugEnabled())
+          LOG.debug(getName() + ": disconnecting client " + 
+                    c.getHostAddress() + ". Number of active connections: "+
+                    numConnections);
+        closeConnection(c);
+        c = null;
+      }
+      else {
+        c.setLastContact(System.currentTimeMillis());
+      }
+    }   
+
+    synchronized void doStop() {
+      if (selector != null) {
+        selector.wakeup();
+        Thread.yield();
+      }
+      if (acceptChannel != null) {
+        try {
+          acceptChannel.socket().close();
+        } catch (IOException e) {
+          LOG.info(getName() + ":Exception in closing listener socket. " + e);
+        }
+      }
+    }
+  }
+
+  // Sends responses of RPC back to clients.
+  private class Responder extends Thread {
+    private Selector writeSelector;
+    private int pending;         // connections waiting to register
+    
+    final static int PURGE_INTERVAL = 900000; // 15mins
+
+    Responder() throws IOException {
+      this.setName("IPC Server Responder");
+      this.setDaemon(true);
+      writeSelector = Selector.open(); // create a selector
+      pending = 0;
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(HBaseServer.this);
+      long lastPurgeTime = 0;   // last check for old calls.
+
+      while (running) {
+        try {
+          waitPending();     // If a channel is being registered, wait.
+          writeSelector.select(PURGE_INTERVAL);
+          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
+          while (iter.hasNext()) {
+            SelectionKey key = iter.next();
+            iter.remove();
+            try {
+              if (key.isValid() && key.isWritable()) {
+                  doAsyncWrite(key);
+              }
+            } catch (IOException e) {
+              LOG.info(getName() + ": doAsyncWrite threw exception " + e);
+            }
+          }
+          long now = System.currentTimeMillis();
+          if (now < lastPurgeTime + PURGE_INTERVAL) {
+            continue;
+          }
+          lastPurgeTime = now;
+          //
+          // If there were some calls that have not been sent out for a
+          // long time, discard them.
+          //
+          LOG.debug("Checking for old call responses.");
+          ArrayList<Call> calls;
+          
+          // get the list of channels from list of keys.
+          synchronized (writeSelector.keys()) {
+            calls = new ArrayList<Call>(writeSelector.keys().size());
+            iter = writeSelector.keys().iterator();
+            while (iter.hasNext()) {
+              SelectionKey key = iter.next();
+              Call call = (Call)key.attachment();
+              if (call != null && key.channel() == call.connection.channel) { 
+                calls.add(call);
+              }
+            }
+          }
+          
+          for(Call call : calls) {
+            try {
+              doPurge(call, now);
+            } catch (IOException e) {
+              LOG.warn("Error in purging old calls " + e);
+            }
+          }
+        } catch (OutOfMemoryError e) {
+          //
+          // we can run out of memory if we have too many threads
+          // log the event and sleep for a minute and give
+          // some thread(s) a chance to finish
+          //
+          LOG.warn("Out of Memory in server select", e);
+          try { Thread.sleep(60000); } catch (Exception ie) {}
+        } catch (Exception e) {
+          LOG.warn("Exception in Responder " + 
+                   StringUtils.stringifyException(e));
+        }
+      }
+      LOG.info("Stopping " + this.getName());
+    }
+
+    private void doAsyncWrite(SelectionKey key) throws IOException {
+      Call call = (Call)key.attachment();
+      if (call == null) {
+        return;
+      }
+      if (key.channel() != call.connection.channel) {
+        throw new IOException("doAsyncWrite: bad channel");
+      }
+
+      synchronized(call.connection.responseQueue) {
+        if (processResponse(call.connection.responseQueue, false)) {
+          try {
+            key.interestOps(0);
+          } catch (CancelledKeyException e) {
+            /* The Listener/reader might have closed the socket.
+             * We don't explicitly cancel the key, so not sure if this will
+             * ever fire.
+             * This warning could be removed.
+             */
+            LOG.warn("Exception while changing ops : " + e);
+          }
+        }
+      }
+    }
+
+    //
+    // Remove calls that have been pending in the responseQueue 
+    // for a long time.
+    //
+    private void doPurge(Call call, long now) throws IOException {
+      LinkedList<Call> responseQueue = call.connection.responseQueue;
+      synchronized (responseQueue) {
+        Iterator<Call> iter = responseQueue.listIterator(0);
+        while (iter.hasNext()) {
+          call = iter.next();
+          if (now > call.timestamp + PURGE_INTERVAL) {
+            closeConnection(call.connection);
+            break;
+          }
+        }
+      }
+    }
+
+    // Processes one response. Returns true if there are no more pending
+    // data for this channel.
+    //
+    private boolean processResponse(LinkedList<Call> responseQueue,
+                                    boolean inHandler) throws IOException {
+      boolean error = true;
+      boolean done = false;       // there is more data for this channel.
+      int numElements = 0;
+      Call call = null;
+      try {
+        synchronized (responseQueue) {
+          //
+          // If there are no items for this channel, then we are done
+          //
+          numElements = responseQueue.size();
+          if (numElements == 0) {
+            error = false;
+            return true;              // no more data for this channel.
+          }
+          //
+          // Extract the first call
+          //
+          call = responseQueue.removeFirst();
+          SocketChannel channel = call.connection.channel;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                      call.connection);
+          }
+          //
+          // Send as much data as we can in the non-blocking fashion
+          //
+          int numBytes = channel.write(call.response);
+          if (numBytes < 0) {
+            return true;
+          }
+          if (!call.response.hasRemaining()) {
+            call.connection.decRpcCount();
+            if (numElements == 1) {    // last call fully processes.
+              done = true;             // no more data for this channel.
+            } else {
+              done = false;            // more calls pending to be sent.
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                        call.connection + " Wrote " + numBytes + " bytes.");
+            }
+          } else {
+            //
+            // If we were unable to write the entire response out, then 
+            // insert in Selector queue. 
+            //
+            call.connection.responseQueue.addFirst(call);
+            
+            if (inHandler) {
+              // set the serve time when the response has to be sent later
+              call.timestamp = System.currentTimeMillis();
+              
+              incPending();
+              try {
+                // Wakeup the thread blocked on select, only then can the call 
+                // to channel.register() complete.
+                writeSelector.wakeup();
+                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
+              } catch (ClosedChannelException e) {
+                //Its ok. channel might be closed else where.
+                done = true;
+              } finally {
+                decPending();
+              }
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(getName() + ": responding to #" + call.id + " from " +
+                        call.connection + " Wrote partial " + numBytes + 
+                        " bytes.");
+            }
+          }
+          error = false;              // everything went off well
+        }
+      } finally {
+        if (error && call != null) {
+          LOG.warn(getName()+", call " + call + ": output error");
+          done = true;               // error. no more data for this channel.
+          closeConnection(call.connection);
+        }
+      }
+      return done;
+    }
+
+    //
+    // Enqueue a response from the application.
+    //
+    void doRespond(Call call) throws IOException {
+      synchronized (call.connection.responseQueue) {
+        call.connection.responseQueue.addLast(call);
+        if (call.connection.responseQueue.size() == 1) {
+          processResponse(call.connection.responseQueue, true);
+        }
+      }
+    }
+
+    private synchronized void incPending() {   // call waiting to be enqueued.
+      pending++;
+    }
+
+    private synchronized void decPending() { // call done enqueueing.
+      pending--;
+      notify();
+    }
+
+    private synchronized void waitPending() throws InterruptedException {
+      while (pending > 0) {
+        wait();
+      }
+    }
+  }
+
+  /** Reads calls from a connection and queues them for handling. */
+  private class Connection {
+    private boolean versionRead = false; //if initial signature and
+                                         //version are read
+    private boolean headerRead = false;  //if the connection header that
+                                         //follows version is read.
+    private SocketChannel channel;
+    private ByteBuffer data;
+    private ByteBuffer dataLengthBuffer;
+    private LinkedList<Call> responseQueue;
+    private volatile int rpcCount = 0; // number of outstanding rpcs
+    private long lastContact;
+    private int dataLength;
+    private Socket socket;
+    // Cache the remote host & port info so that even if the socket is 
+    // disconnected, we can say where it used to connect to.
+    private String hostAddress;
+    private int remotePort;
+    private UserGroupInformation ticket = null;
+
+    public Connection(SelectionKey key, SocketChannel channel, 
+                      long lastContact) {
+      this.channel = channel;
+      this.lastContact = lastContact;
+      this.data = null;
+      this.dataLengthBuffer = ByteBuffer.allocate(4);
+      this.socket = channel.socket();
+      InetAddress addr = socket.getInetAddress();
+      if (addr == null) {
+        this.hostAddress = "*Unknown*";
+      } else {
+        this.hostAddress = addr.getHostAddress();
+      }
+      this.remotePort = socket.getPort();
+      this.responseQueue = new LinkedList<Call>();
+      if (socketSendBufferSize != 0) {
+        try {
+          socket.setSendBufferSize(socketSendBufferSize);
+        } catch (IOException e) {
+          LOG.warn("Connection: unable to set socket send buffer size to " +
+                   socketSendBufferSize);
+        }
+      }
+    }   
+
+    @Override
+    public String toString() {
+      return getHostAddress() + ":" + remotePort; 
+    }
+    
+    public String getHostAddress() {
+      return hostAddress;
+    }
+
+    public void setLastContact(long lastContact) {
+      this.lastContact = lastContact;
+    }
+
+    public long getLastContact() {
+      return lastContact;
+    }
+
+    /* Return true if the connection has no outstanding rpc */
+    private boolean isIdle() {
+      return rpcCount == 0;
+    }
+    
+    /* Decrement the outstanding RPC count */
+    private void decRpcCount() {
+      rpcCount--;
+    }
+    
+    /* Increment the outstanding RPC count */
+    private void incRpcCount() {
+      rpcCount++;
+    }
+    
+    private boolean timedOut(long currentTime) {
+      if (isIdle() && currentTime -  lastContact > maxIdleTime)
+        return true;
+      return false;
+    }
+
+    public int readAndProcess() throws IOException, InterruptedException {
+      while (true) {
+        /* Read at most one RPC. If the header is not read completely yet
+         * then iterate until we read first RPC or until there is no data left.
+         */    
+        int count = -1;
+        if (dataLengthBuffer.remaining() > 0) {
+          count = channel.read(dataLengthBuffer);       
+          if (count < 0 || dataLengthBuffer.remaining() > 0) 
+            return count;
+        }
+      
+        if (!versionRead) {
+          //Every connection is expected to send the header.
+          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
+          count = channel.read(versionBuffer);
+          if (count <= 0) {
+            return count;
+          }
+          int version = versionBuffer.get(0);
+          
+          dataLengthBuffer.flip();          
+          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
+            //Warning is ok since this is not supposed to happen.
+            LOG.warn("Incorrect header or version mismatch from " + 
+                     hostAddress + ":" + remotePort +
+                     " got version " + version + 
+                     " expected version " + CURRENT_VERSION);
+            return -1;
+          }
+          dataLengthBuffer.clear();
+          versionRead = true;
+          continue;
+        }
+        
+        if (data == null) {
+          dataLengthBuffer.flip();
+          dataLength = dataLengthBuffer.getInt();
+       
+          if (dataLength == HBaseClient.PING_CALL_ID) {
+            dataLengthBuffer.clear();
+            return 0;  //ping message
+          }
+          data = ByteBuffer.allocate(dataLength);
+          incRpcCount();  // Increment the rpc count
+        }
+        
+        count = channel.read(data);
+        
+        if (data.remaining() == 0) {
+          dataLengthBuffer.clear();
+          data.flip();
+          if (headerRead) {
+            processData();
+            data = null;
+            return count;
+          } else {
+            processHeader();
+            headerRead = true;
+            data = null;
+            continue;
+          }
+        } 
+        return count;
+      }
+    }
+
+    /// Reads the header following version
+    private void processHeader() throws IOException {
+      /* In the current version, it is just a ticket.
+       * Later we could introduce a "ConnectionHeader" class.
+       */
+      DataInputStream in =
+        new DataInputStream(new ByteArrayInputStream(data.array()));
+      ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf);
+    }
+    
+    private void processData() throws  IOException, InterruptedException {
+      DataInputStream dis =
+        new DataInputStream(new ByteArrayInputStream(data.array()));
+      int id = dis.readInt();                    // try to read an id
+        
+      if (LOG.isDebugEnabled())
+        LOG.debug(" got #" + id);
+            
+      Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
+      param.readFields(dis);        
+        
+      Call call = new Call(id, param, this);
+      callQueue.put(call);              // queue the call; maybe blocked here
+    }
+
+    private synchronized void close() throws IOException {
+      data = null;
+      dataLengthBuffer = null;
+      if (!channel.isOpen())
+        return;
+      try {socket.shutdownOutput();} catch(Exception e) {}
+      if (channel.isOpen()) {
+        try {channel.close();} catch(Exception e) {}
+      }
+      try {socket.close();} catch(Exception e) {}
+    }
+  }
+
+  /** Handles queued calls . */
+  private class Handler extends Thread {
+    public Handler(int instanceNumber) {
+      this.setDaemon(true);
+      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+    }
+
+    @Override
+    public void run() {
+      LOG.info(getName() + ": starting");
+      SERVER.set(HBaseServer.this);
+      final int buffersize = 16 * 1024;
+      ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize);
+      while (running) {
+        try {
+          Call call = callQueue.take(); // pop the queue; maybe blocked here
+
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + ": has #" + call.id + " from " +
+                      call.connection);
+          
+          String errorClass = null;
+          String error = null;
+          Writable value = null;
+          
+          CurCall.set(call);
+          UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
+          UserGroupInformation.setCurrentUGI(call.connection.ticket);
+          try {
+            value = call(call.param, call.timestamp);             // make the call
+          } catch (Throwable e) {
+            LOG.info(getName()+", call "+call+": error: " + e, e);
+            errorClass = e.getClass().getName();
+            error = StringUtils.stringifyException(e);
+          }
+          UserGroupInformation.setCurrentUGI(previous);
+          CurCall.set(null);
+
+          if (buf.size() > buffersize) {
+            // Allocate a new BAOS as reset only moves size back to zero but
+            // keeps the buffer of whatever the largest write was -- see
+            // hbase-900.
+            buf = new ByteArrayOutputStream(buffersize);
+          } else {
+            buf.reset();
+          }
+          DataOutputStream out = new DataOutputStream(buf);
+          out.writeInt(call.id);                // write call id
+          out.writeBoolean(error != null);      // write error flag
+
+          if (error == null) {
+            value.write(out);
+          } else {
+            WritableUtils.writeString(out, errorClass);
+            WritableUtils.writeString(out, error);
+          }
+          call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+          responder.doRespond(call);
+        } catch (InterruptedException e) {
+          if (running) {                          // unexpected -- log it
+            LOG.info(getName() + " caught: " +
+                     StringUtils.stringifyException(e));
+          }
+        } catch (Exception e) {
+          LOG.info(getName() + " caught: " +
+                   StringUtils.stringifyException(e));
+        }
+      }
+      LOG.info(getName() + ": exiting");
+    }
+
+  }
+  
+  protected HBaseServer(String bindAddress, int port,
+                  Class<? extends Writable> paramClass, int handlerCount, 
+                  Configuration conf)
+    throws IOException 
+  {
+    this(bindAddress, port, paramClass, handlerCount,  conf, Integer.toString(port));
+  }
+  /** Constructs a server listening on the named port and address.  Parameters passed must
+   * be of the named class.  The <code>handlerCount</handlerCount> determines
+   * the number of handler threads that will be used to process calls.
+   * 
+   */
+  protected HBaseServer(String bindAddress, int port, 
+                  Class<? extends Writable> paramClass, int handlerCount, 
+                  Configuration conf, String serverName) 
+    throws IOException {
+    this.bindAddress = bindAddress;
+    this.conf = conf;
+    this.port = port;
+    this.paramClass = paramClass;
+    this.handlerCount = handlerCount;
+    this.socketSendBufferSize = 0;
+    this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
+    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
+    this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
+    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
+    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
+    
+    // Start the listener here and let it bind to the port
+    listener = new Listener();
+    this.port = listener.getAddress().getPort();    
+    this.rpcMetrics = new HBaseRpcMetrics(serverName,
+                          Integer.toString(this.port), this);
+    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
+
+
+    // Create the responder here
+    responder = new Responder();
+  }
+
+  private void closeConnection(Connection connection) {
+    synchronized (connectionList) {
+      if (connectionList.remove(connection))
+        numConnections--;
+    }
+    try {
+      connection.close();
+    } catch (IOException e) {
+    }
+  }
+  
+  /** Sets the socket buffer size used for responding to RPCs */
+  public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
+
+  /** Starts the service.  Must be called before any calls will be handled. */
+  public synchronized void start() throws IOException {
+    responder.start();
+    listener.start();
+    handlers = new Handler[handlerCount];
+    
+    for (int i = 0; i < handlerCount; i++) {
+      handlers[i] = new Handler(i);
+      handlers[i].start();
+    }
+  }
+
+  /** Stops the service.  No new calls will be handled after this is called. */
+  public synchronized void stop() {
+    LOG.info("Stopping server on " + port);
+    running = false;
+    if (handlers != null) {
+      for (int i = 0; i < handlerCount; i++) {
+        if (handlers[i] != null) {
+          handlers[i].interrupt();
+        }
+      }
+    }
+    listener.interrupt();
+    listener.doStop();
+    responder.interrupt();
+    notifyAll();
+    if (this.rpcMetrics != null) {
+      this.rpcMetrics.shutdown();
+    }
+  }
+
+  /** Wait for the server to be stopped.
+   * Does not wait for all subthreads to finish.
+   *  See {@link #stop()}.
+   */
+  public synchronized void join() throws InterruptedException {
+    while (running) {
+      wait();
+    }
+  }
+
+  /**
+   * Return the socket (ip+port) on which the RPC server is listening to.
+   * @return the socket (ip+port) on which the RPC server is listening to.
+   */
+  public synchronized InetSocketAddress getListenerAddress() {
+    return listener.getAddress();
+  }
+  
+  /** Called for each call. */
+  public abstract Writable call(Writable param, long receiveTime)
+                                                throws IOException;
+  
+  
+  /**
+   * The number of open RPC conections
+   * @return the number of open rpc connections
+   */
+  public int getNumOpenConnections() {
+    return numConnections;
+  }
+  
+  /**
+   * The number of rpc calls in the queue.
+   * @return The number of rpc calls in the queue.
+   */
+  public int getCallQueueLen() {
+    return callQueue.size();
+  }
+  
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/package.html?rev=724231&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/package.html (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/package.html Sun Dec  7 17:43:45 2008
@@ -0,0 +1,24 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Tools to help define network clients and servers.
+This is the hadoop copied local so can fix bugs and make hbase-specific optimizations.
+</body>
+</html>

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java?rev=724231&r1=724230&r2=724231&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java Sun Dec  7 17:43:45 2008
@@ -60,10 +60,11 @@
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.HbaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -76,7 +77,6 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.Server;
 
 /**
  * HMaster is the "master server" for a HBase.
@@ -91,8 +91,7 @@
   
   static final Log LOG = LogFactory.getLog(HMaster.class.getName());
 
-  public long getProtocolVersion(@SuppressWarnings("unused") String protocol,
-      @SuppressWarnings("unused") long clientVersion) {
+  public long getProtocolVersion(String protocol, long clientVersion) {
     return HBaseRPCProtocolVersion.versionID;
   }
 
@@ -117,7 +116,7 @@
   volatile BlockingQueue<RegionServerOperation> toDoQueue =
     new LinkedBlockingQueue<RegionServerOperation>();
 
-  private final Server server;
+  private final HBaseServer server;
   private final HServerAddress address;
 
   final ServerConnection connection;
@@ -222,7 +221,7 @@
       conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000);
     this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000);
     
-    this.server = HbaseRPC.getServer(this, address.getBindAddress(),
+    this.server = HBaseRPC.getServer(this, address.getBindAddress(),
         address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
         false, conf);
 
@@ -530,13 +529,11 @@
   /*
    * HMasterRegionInterface
    */
-
-  @SuppressWarnings("unused")
   public MapWritable regionServerStartup(HServerInfo serverInfo)
   throws IOException {
     // Set the address for now even tho it will not be persisted on
     // the HRS side.
-    String rsAddress = Server.getRemoteAddress();
+    String rsAddress = HBaseServer.getRemoteAddress();
     serverInfo.setServerAddress(new HServerAddress
         (rsAddress, serverInfo.getServerAddress().getPort()));
     // register with server manager
@@ -552,7 +549,7 @@
   protected MapWritable createConfigurationSubset() {
     MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
     // Get the real address of the HRS.
-    String rsAddress = Server.getRemoteAddress();
+    String rsAddress = HBaseServer.getRemoteAddress();
     if (rsAddress != null) {
       mw.put(new Text("hbase.regionserver.address"), new Text(rsAddress));
     }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=724231&r1=724230&r2=724231&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Dec  7 17:43:45 2008
@@ -72,8 +72,8 @@
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionHistorian;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.UnknownRowLockException;
+import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.ValueOverMaxLengthException;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.client.ServerConnection;
@@ -84,10 +84,11 @@
 import org.apache.hadoop.hbase.io.Cell;
 import org.apache.hadoop.hbase.io.HbaseMapWritable;
 import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.HbaseRPC;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -96,7 +97,6 @@
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -154,7 +154,7 @@
 
   // Server to handle client requests.  Default access so can be accessed by
   // unit tests.
-  final Server server;
+  final HBaseServer server;
   
   // Leases
   private final Leases leases;
@@ -258,7 +258,7 @@
     this.workerThread = new Thread(worker);
 
     // Server to handle client requests
-    this.server = HbaseRPC.getServer(this, address.getBindAddress(), 
+    this.server = HBaseRPC.getServer(this, address.getBindAddress(), 
       address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
       false, conf);
     // Address is givin a default IP for the moment. Will be changed after
@@ -518,7 +518,7 @@
         serverInfo.getServerAddress().toString());
     }
     if (this.hbaseMaster != null) {
-      HbaseRPC.stopProxy(this.hbaseMaster);
+      HBaseRPC.stopProxy(this.hbaseMaster);
       this.hbaseMaster = null;
     }
     join();
@@ -959,7 +959,7 @@
       try {
         // Do initial RPC setup.  The final argument indicates that the RPC
         // should retry indefinitely.
-        master = (HMasterRegionInterface)HbaseRPC.waitForProxy(
+        master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
             HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
             new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
             this.conf, -1);

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=724231&r1=724230&r2=724231&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Sun Dec  7 17:43:45 2008
@@ -823,7 +823,7 @@
       List<HStoreFile> filesToCompact = null;
       synchronized (storefiles) {
         if (this.storefiles.size() <= 0) {
-          LOG.debug("no store files to compact");
+          LOG.debug(this.storeNameStr + ": no store files to compact");
           return null;
         }
         // filesToCompact are sorted oldest to newest.