You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2008/12/08 04:20:44 UTC

svn commit: r724238 [1/3] - in /hadoop/hbase/branches/0.19_on_hadoop_0.18: ./ conf/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apach...

Author: apurtell
Date: Sun Dec  7 19:20:43 2008
New Revision: 724238

URL: http://svn.apache.org/viewvc?rev=724238&view=rev
Log:
merge up to trunk (rev 724237)

Added:
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/package.html
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/rest/package.html
Removed:
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/ipc/
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java
Modified:
    hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt
    hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/master/HMaster.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/util/Sleeper.java
    hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/onelab/filter/Filter.java

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt Sun Dec  7 19:20:43 2008
@@ -90,12 +90,20 @@
    HBASE-1041  Migration throwing NPE
    HBASE-1042  OOME but we don't abort; two part commit.
    HBASE-927   We don't recover if HRS hosting -ROOT-/.META. goes down
+   HBASE-1029  REST wiki documentation incorrect
+               (Sishen Freecity via Stack)
    HBASE-1043  Removing @Override attributes where they are no longer needed.
                (Ryan Smith via Jim Kellerman)
    HBASE-927   We don't recover if HRS hosting -ROOT-/.META. goes down -
                (fix bug in createTable which caused tests to fail)
+   HBASE-1039  Compaction fails if bloomfilters are enabled
+   HBASE-1027  Make global flusher check work with percentages rather than
+               hard code memory sizes
+   HBASE-1000  Sleeper.sleep does not go back to sleep when interrupted
+               and no stop flag given.
+   HBASE-900   Regionserver memory leak causing OOME during relatively
+               modest bulk importing; part 1
 
-      
   IMPROVEMENTS
    HBASE-901   Add a limit to key length, check key and value length on client side
    HBASE-890   Alter table operation and also related changes in REST interface
@@ -158,6 +166,8 @@
    HBASE-1030  Bit of polish on HBASE-1018
    HBASE-847   new API: HTable.getRow with numVersion specified
                (Doğacan Güney via Stack)
+   HBASE-1048  HLog: Found 0 logs to remove out of total 1450; oldest
+               outstanding seqnum is 162297053 fr om region -ROOT-,,0
 
   NEW FEATURES
    HBASE-875   Use MurmurHash instead of JenkinsHash [in bloomfilters]

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml Sun Dec  7 19:20:43 2008
@@ -191,19 +191,19 @@
     </description>
   </property>
   <property>
-    <name>hbase.regionserver.globalMemcacheLimit</name>
-    <value>536870912</value>
+    <name>hbase.regionserver.globalMemcache.upperLimit</name>
+    <value>0.4</value>
     <description>Maximum size of all memcaches in a region server before new 
-      updates are blocked and flushes are forced. Defaults to 512MB.
+      updates are blocked and flushes are forced. Defaults to 40% of heap.
     </description>
   </property>
   <property>
-    <name>hbase.regionserver.globalMemcacheLimitlowMark</name>
-    <value>256435456</value>
+    <name>hbase.regionserver.globalMemcache.lowerLimit</name>
+    <value>0.25</value>
     <description>When memcaches are being forced to flush to make room in
-      memory, keep flushing until we hit this mark. Defaults to 256MB. Setting
-      this value equal to hbase.regionserver.globalmemcachelimit causes the 
-      minimum possible flushing to occur when updates are blocked due to 
+      memory, keep flushing until we hit this mark. Defaults to 30% of heap. 
+      This value equal to hbase.regionserver.globalmemcache.upperLimit causes
+      the minimum possible flushing to occur when updates are blocked due to 
       memcache limiting.
     </description>
   </property>  

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java Sun Dec  7 19:20:43 2008
@@ -59,19 +59,21 @@
         this.sleeper.sleep();
       }
       this.sleeper.sleep();
-      while(!this.stop.get()) {
+      while (!this.stop.get()) {
+        long startTime = System.currentTimeMillis();
         try {
-          long startTime = System.currentTimeMillis();
           chore();
-          this.sleeper.sleep(startTime);
         } catch (Exception e) {
           LOG.error("Caught exception", e);
+          if (this.stop.get()) {
+            continue;
+          }
         }
+        this.sleeper.sleep(startTime);
       }
     } catch (Throwable t) {
       LOG.fatal("Caught error. Starting shutdown.", t);
       this.stop.set(true);
-      
     } finally {
       LOG.info(getName() + " exiting");
     }
@@ -97,4 +99,4 @@
   protected void sleep() {
     this.sleeper.sleep();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sun Dec  7 19:20:43 2008
@@ -50,7 +50,7 @@
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.HbaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
 import org.apache.hadoop.hbase.util.Writables;
@@ -192,7 +192,7 @@
           masterLocation = new HServerAddress(this.conf.get(MASTER_ADDRESS,
             DEFAULT_MASTER_ADDRESS));
           try {
-            HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy(
+            HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
                 HMasterInterface.class, HBaseRPCProtocolVersion.versionID, 
                 masterLocation.getInetSocketAddress(), this.conf);
             
@@ -732,7 +732,7 @@
         server = this.servers.get(regionServer.toString());
         if (server == null) { // Get a connection
           try {
-            server = (HRegionInterface)HbaseRPC.waitForProxy(
+            server = (HRegionInterface)HBaseRPC.waitForProxy(
                 serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
                 regionServer.getInetSocketAddress(), this.conf, 
                 this.maxRPCAttempts);
@@ -954,7 +954,7 @@
     void close(boolean stopProxy) {
       if (master != null) {
         if (stopProxy) {
-          HbaseRPC.stopProxy(master);
+          HBaseRPC.stopProxy(master);
         }
         master = null;
         masterChecked = false;
@@ -962,7 +962,7 @@
       if (stopProxy) {
         synchronized (servers) {
           for (HRegionInterface i: servers.values()) {
-            HbaseRPC.stopProxy(i);
+            HBaseRPC.stopProxy(i);
           }
         }
       }

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java Sun Dec  7 19:20:43 2008
@@ -487,7 +487,7 @@
    * at a specified timestamp
    * 
    * @param row row key
-   * @param ts timestamp
+   * @param timestamp timestamp
    * @param numVersions number of versions to return
    * @return RowResult is empty if row does not exist.
    * @throws IOException

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java Sun Dec  7 19:20:43 2008
@@ -75,8 +75,8 @@
     throws IOException {
       Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
       if(!fs.exists(filterFile)) {
-        throw new FileNotFoundException("Could not find bloom filter: " +
-            filterFile);
+        LOG.warn("FileNotFound: " + filterFile + "; proceeding without");
+        return null;
       }
       BloomFilter filter = new BloomFilter();
       FSDataInputStream in = fs.open(filterFile);
@@ -180,13 +180,19 @@
          * If we fix the number of hash functions and know the number of
          * entries, then the optimal vector size m = (k * n) / ln(2)
          */
-        this.bloomFilter = new BloomFilter(
+        BloomFilter f = null;
+        try {
+          f  = new BloomFilter(
             (int) Math.ceil(
                 (DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
                 Math.log(2.0)),
             (int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
             Hash.getHashType(conf)
-        );
+          );
+        } catch (IllegalArgumentException e) {
+          LOG.warn("Failed creating bloomfilter; proceeding without", e);
+        }
+        this.bloomFilter = f;
       } else {
         this.bloomFilter = null;
       }

Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=724238&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Sun Dec  7 19:20:43 2008
@@ -0,0 +1,844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.net.ConnectException;
+
+import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FilterInputStream;
+import java.io.InputStream;
+
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** A client for an IPC service.  IPC calls take a single {@link Writable} as a
+ * parameter, and return a {@link Writable} as their value.  A service runs on
+ * a port and is defined by a parameter class and a value class.
+ * 
+ * <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
+ * moved into this package so can access package-private methods.
+ * 
+ * @see HBaseServer
+ */
+public class HBaseClient {
+  
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.HBaseClass");
+  private Hashtable<ConnectionId, Connection> connections =
+    new Hashtable<ConnectionId, Connection>();
+
+  private Class<? extends Writable> valueClass;   // class of call values
+  private int counter;                            // counter for call ids
+  private AtomicBoolean running = new AtomicBoolean(true); // if client runs
+  final private Configuration conf;
+  final private int maxIdleTime; //connections will be culled if it was idle for 
+                           //maxIdleTime msecs
+  final private int maxRetries; //the max. no. of retries for socket connections
+  private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+  private int pingInterval; // how often sends ping to the server in msecs
+
+  private SocketFactory socketFactory;           // how to create sockets
+  private int refCount = 1;
+  
+  final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
+  final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
+  final static int PING_CALL_ID = -1;
+  
+  /**
+   * set the ping interval value in configuration
+   * 
+   * @param conf Configuration
+   * @param pingInterval the ping interval
+   */
+  final public static void setPingInterval(Configuration conf, int pingInterval) {
+    conf.setInt(PING_INTERVAL_NAME, pingInterval);
+  }
+
+  /**
+   * Get the ping interval from configuration;
+   * If not set in the configuration, return the default value.
+   * 
+   * @param conf Configuration
+   * @return the ping interval
+   */
+  final static int getPingInterval(Configuration conf) {
+    return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
+  }
+  
+  /**
+   * Increment this client's reference count
+   *
+   */
+  synchronized void incCount() {
+    refCount++;
+  }
+  
+  /**
+   * Decrement this client's reference count
+   *
+   */
+  synchronized void decCount() {
+    refCount--;
+  }
+  
+  /**
+   * Return if this client has no reference
+   * 
+   * @return true if this client has no reference; false otherwise
+   */
+  synchronized boolean isZeroReference() {
+    return refCount==0;
+  }
+
+  /** A call waiting for a value. */
+  private class Call {
+    int id;                                       // call id
+    Writable param;                               // parameter
+    Writable value;                               // value, null if error
+    IOException error;                            // exception, null if value
+    boolean done;                                 // true when call is done
+
+    protected Call(Writable param) {
+      this.param = param;
+      synchronized (HBaseClient.this) {
+        this.id = counter++;
+      }
+    }
+
+    /** Indicate when the call is complete and the
+     * value or error are available.  Notifies by default.  */
+    protected synchronized void callComplete() {
+      this.done = true;
+      notify();                                 // notify caller
+    }
+
+    /** Set the exception when there is an error.
+     * Notify the caller the call is done.
+     * 
+     * @param error exception thrown by the call; either local or remote
+     */
+    public synchronized void setException(IOException error) {
+      this.error = error;
+      callComplete();
+    }
+    
+    /** Set the return value when there is no error. 
+     * Notify the caller the call is done.
+     * 
+     * @param value return value of the call.
+     */
+    public synchronized void setValue(Writable value) {
+      this.value = value;
+      callComplete();
+    }
+  }
+
+  /** Thread that reads responses and notifies callers.  Each connection owns a
+   * socket connected to a remote address.  Calls are multiplexed through this
+   * socket: responses may be delivered out of order. */
+  private class Connection extends Thread {
+    private ConnectionId remoteId;
+    private Socket socket = null;                 // connected socket
+    private DataInputStream in;
+    private DataOutputStream out;
+    
+    // currently active calls
+    private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
+    private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
+    private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
+    private IOException closeException; // close reason
+
+    public Connection(InetSocketAddress address) throws IOException {
+      this(new ConnectionId(address, null));
+    }
+    
+    public Connection(ConnectionId remoteId) throws IOException {
+      if (remoteId.getAddress().isUnresolved()) {
+        throw new UnknownHostException("unknown host: " + 
+                                       remoteId.getAddress().getHostName());
+      }
+      this.remoteId = remoteId;
+      UserGroupInformation ticket = remoteId.getTicket();
+      this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
+          remoteId.getAddress().toString() +
+          " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
+      this.setDaemon(true);
+    }
+
+    /** Update lastActivity with the current time. */
+    private void touch() {
+      lastActivity.set(System.currentTimeMillis());
+    }
+
+    /**
+     * Add a call to this connection's call queue and notify
+     * a listener; synchronized.
+     * Returns false if called during shutdown.
+     * @param call to add
+     * @return true if the call was added.
+     */
+    private synchronized boolean addCall(Call call) {
+      if (shouldCloseConnection.get())
+        return false;
+      calls.put(call.id, call);
+      notify();
+      return true;
+    }
+
+    /** This class sends a ping to the remote side when timeout on
+     * reading. If no failure is detected, it retries until at least
+     * a byte is read.
+     */
+    private class PingInputStream extends FilterInputStream {
+      /* constructor */
+      protected PingInputStream(InputStream in) {
+        super(in);
+      }
+
+      /* Process timeout exception
+       * if the connection is not going to be closed, send a ping.
+       * otherwise, throw the timeout exception.
+       */
+      private void handleTimeout(SocketTimeoutException e) throws IOException {
+        if (shouldCloseConnection.get() || !running.get()) {
+          throw e;
+        } else {
+          sendPing();
+        }
+      }
+      
+      /** Read a byte from the stream.
+       * Send a ping if timeout on read. Retries if no failure is detected
+       * until a byte is read.
+       * @throws IOException for any IO problem other than socket timeout
+       */
+      public int read() throws IOException {
+        do {
+          try {
+            return super.read();
+          } catch (SocketTimeoutException e) {
+            handleTimeout(e);
+          }
+        } while (true);
+      }
+
+      /** Read bytes into a buffer starting from offset <code>off</code>
+       * Send a ping if timeout on read. Retries if no failure is detected
+       * until a byte is read.
+       * 
+       * @return the total number of bytes read; -1 if the connection is closed.
+       */
+      public int read(byte[] buf, int off, int len) throws IOException {
+        do {
+          try {
+            return super.read(buf, off, len);
+          } catch (SocketTimeoutException e) {
+            handleTimeout(e);
+          }
+        } while (true);
+      }
+    }
+    
+    /** Connect to the server and set up the I/O streams. It then sends
+     * a header to the server and starts
+     * the connection thread that waits for responses.
+     */
+    private synchronized void setupIOstreams() {
+      if (socket != null || shouldCloseConnection.get()) {
+        return;
+      }
+      
+      short ioFailures = 0;
+      short timeoutFailures = 0;
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Connecting to "+remoteId.getAddress());
+        }
+        while (true) {
+          try {
+            this.socket = socketFactory.createSocket();
+            this.socket.setTcpNoDelay(tcpNoDelay);
+            // connection time out is 20s
+            this.socket.connect(remoteId.getAddress(), 20000);
+            this.socket.setSoTimeout(pingInterval);
+            break;
+          } catch (SocketTimeoutException toe) {
+            /* The max number of retries is 45,
+             * which amounts to 20s*45 = 15 minutes retries.
+             */
+            handleConnectionFailure(timeoutFailures++, 45, toe);
+          } catch (IOException ie) {
+            handleConnectionFailure(ioFailures++, maxRetries, ie);
+          }
+        }
+        this.in = new DataInputStream(new BufferedInputStream
+            (new PingInputStream(NetUtils.getInputStream(socket))));
+        this.out = new DataOutputStream
+            (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+        writeHeader();
+
+        // update last activity time
+        touch();
+
+        // start the receiver thread after the socket connection has been set up
+        start();
+      } catch (IOException e) {
+        markClosed(e);
+        close();
+      }
+    }
+
+    /* Handle connection failures
+     *
+     * If the current number of retries is equal to the max number of retries,
+     * stop retrying and throw the exception; Otherwise backoff 1 second and
+     * try connecting again.
+     *
+     * This Method is only called from inside setupIOstreams(), which is
+     * synchronized. Hence the sleep is synchronized; the locks will be retained.
+     *
+     * @param curRetries current number of retries
+     * @param maxRetries max number of retries allowed
+     * @param ioe failure reason
+     * @throws IOException if max number of retries is reached
+     */
+    private void handleConnectionFailure(
+        int curRetries, int maxRetries, IOException ioe) throws IOException {
+      // close the current connection
+      try {
+        socket.close();
+      } catch (IOException e) {
+        LOG.warn("Not able to close a socket", e);
+      }
+      // set socket to null so that the next call to setupIOstreams
+      // can start the process of connect all over again.
+      socket = null;
+
+      // throw the exception if the maximum number of retries is reached
+      if (curRetries >= maxRetries) {
+        throw ioe;
+      }
+
+      // otherwise back off and retry
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {}
+      
+      LOG.info("Retrying connect to server: " + remoteId.getAddress() + 
+          ". Already tried " + curRetries + " time(s).");
+    }
+
+    /* Write the header for each connection
+     * Out is not synchronized because only the first thread does this.
+     */
+    private void writeHeader() throws IOException {
+      out.write(HBaseServer.HEADER.array());
+      out.write(HBaseServer.CURRENT_VERSION);
+      //When there are more fields we can have ConnectionHeader Writable.
+      DataOutputBuffer buf = new DataOutputBuffer();
+      ObjectWritable.writeObject(buf, remoteId.getTicket(), 
+                                 UserGroupInformation.class, conf);
+      int bufLen = buf.getLength();
+      out.writeInt(bufLen);
+      out.write(buf.getData(), 0, bufLen);
+    }
+    
+    /* wait till someone signals us to start reading RPC response or
+     * it is idle too long, it is marked as to be closed, 
+     * or the client is marked as not running.
+     * 
+     * Return true if it is time to read a response; false otherwise.
+     */
+    private synchronized boolean waitForWork() {
+      if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
+        long timeout = maxIdleTime-
+              (System.currentTimeMillis()-lastActivity.get());
+        if (timeout>0) {
+          try {
+            wait(timeout);
+          } catch (InterruptedException e) {}
+        }
+      }
+      
+      if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
+        return true;
+      } else if (shouldCloseConnection.get()) {
+        return false;
+      } else if (calls.isEmpty()) { // idle connection closed or stopped
+        markClosed(null);
+        return false;
+      } else { // get stopped but there are still pending requests 
+        markClosed((IOException)new IOException().initCause(
+            new InterruptedException()));
+        return false;
+      }
+    }
+
+    public InetSocketAddress getRemoteAddress() {
+      return remoteId.getAddress();
+    }
+
+    /* Send a ping to the server if the time elapsed 
+     * since last I/O activity is equal to or greater than the ping interval
+     */
+    private synchronized void sendPing() throws IOException {
+      long curTime = System.currentTimeMillis();
+      if ( curTime - lastActivity.get() >= pingInterval) {
+        lastActivity.set(curTime);
+        synchronized (out) {
+          out.writeInt(PING_CALL_ID);
+          out.flush();
+        }
+      }
+    }
+
+    public void run() {
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": starting, having connections " 
+            + connections.size());
+
+      while (waitForWork()) {//wait here for work - read or close connection
+        receiveResponse();
+      }
+      
+      close();
+      
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": stopped, remaining connections "
+            + connections.size());
+    }
+
+    /** Initiates a call by sending the parameter to the remote server.
+     * Note: this is not called from the Connection thread, but by other
+     * threads.
+     */
+    public void sendParam(Call call) {
+      if (shouldCloseConnection.get()) {
+        return;
+      }
+
+      DataOutputBuffer d=null;
+      try {
+        synchronized (this.out) {
+          if (LOG.isDebugEnabled())
+            LOG.debug(getName() + " sending #" + call.id);
+          
+          //for serializing the
+          //data to be written
+          d = new DataOutputBuffer();
+          d.writeInt(call.id);
+          call.param.write(d);
+          byte[] data = d.getData();
+          int dataLength = d.getLength();
+          out.writeInt(dataLength);      //first put the data length
+          out.write(data, 0, dataLength);//write the data
+          out.flush();
+        }
+      } catch(IOException e) {
+        markClosed(e);
+      } finally {
+        //the buffer is just an in-memory buffer, but it is still polite to
+        // close early
+        IOUtils.closeStream(d);
+      }
+    }  
+
+    /* Receive a response.
+     * Because only one receiver, so no synchronization on in.
+     */
+    private void receiveResponse() {
+      if (shouldCloseConnection.get()) {
+        return;
+      }
+      touch();
+      
+      try {
+        int id = in.readInt();                    // try to read an id
+
+        if (LOG.isDebugEnabled())
+          LOG.debug(getName() + " got value #" + id);
+
+        Call call = calls.remove(id);
+
+        boolean isError = in.readBoolean();     // read if error
+        if (isError) {
+          call.setException(new RemoteException( WritableUtils.readString(in),
+              WritableUtils.readString(in)));
+        } else {
+          Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+          value.readFields(in);                 // read value
+          call.setValue(value);
+        }
+      } catch (IOException e) {
+        markClosed(e);
+      }
+    }
+    
+    private synchronized void markClosed(IOException e) {
+      if (shouldCloseConnection.compareAndSet(false, true)) {
+        closeException = e;
+        notifyAll();
+      }
+    }
+    
+    /** Close the connection. */
+    private synchronized void close() {
+      if (!shouldCloseConnection.get()) {
+        LOG.error("The connection is not in the closed state");
+        return;
+      }
+
+      // release the resources
+      // first thing to do;take the connection out of the connection list
+      synchronized (connections) {
+        if (connections.get(remoteId) == this) {
+          connections.remove(remoteId);
+        }
+      }
+
+      // close the streams and therefore the socket
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(in);
+
+      // clean up all calls
+      if (closeException == null) {
+        if (!calls.isEmpty()) {
+          LOG.warn(
+              "A connection is closed for no cause and calls are not empty");
+
+          // clean up calls anyway
+          closeException = new IOException("Unexpected closed connection");
+          cleanupCalls();
+        }
+      } else {
+        // log the info
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+              closeException.getMessage(),closeException);
+        }
+
+        // cleanup calls
+        cleanupCalls();
+      }
+      if (LOG.isDebugEnabled())
+        LOG.debug(getName() + ": closed");
+    }
+    
+    /* Cleanup all calls and mark them as done */
+    private void cleanupCalls() {
+      Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
+      while (itor.hasNext()) {
+        Call c = itor.next().getValue(); 
+        c.setException(closeException); // local exception
+        itor.remove();         
+      }
+    }
+  }
+
+  /** Call implementation used for parallel calls. */
+  private class ParallelCall extends Call {
+    private ParallelResults results;
+    private int index;
+    
+    public ParallelCall(Writable param, ParallelResults results, int index) {
+      super(param);
+      this.results = results;
+      this.index = index;
+    }
+
+    /** Deliver result to result collector. */
+    protected void callComplete() {
+      results.callComplete(this);
+    }
+  }
+
+  /** Result collector for parallel calls. */
+  private static class ParallelResults {
+    private Writable[] values;
+    private int size;
+    private int count;
+
+    public ParallelResults(int size) {
+      this.values = new Writable[size];
+      this.size = size;
+    }
+
+    /** Collect a result. */
+    public synchronized void callComplete(ParallelCall call) {
+      values[call.index] = call.value;            // store the value
+      count++;                                    // count it
+      if (count == size)                          // if all values are in
+        notify();                                 // then notify waiting caller
+    }
+  }
+
+  /** Construct an IPC client whose values are of the given {@link Writable}
+   * class. */
+  public HBaseClient(Class<? extends Writable> valueClass, Configuration conf, 
+      SocketFactory factory) {
+    this.valueClass = valueClass;
+    this.maxIdleTime = 
+      conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
+    this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
+    this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
+    this.pingInterval = getPingInterval(conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+    }
+    this.conf = conf;
+    this.socketFactory = factory;
+  }
+
+  /**
+   * Construct an IPC client with the default SocketFactory
+   * @param valueClass
+   * @param conf
+   */
+  public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
+    this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
+  }
+ 
+  /** Return the socket factory of this client
+   *
+   * @return this client's socket factory
+   */
+  SocketFactory getSocketFactory() {
+    return socketFactory;
+  }
+
+  /** Stop all threads related to this client.  No further calls may be made
+   * using this client. */
+  public void stop() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Stopping client");
+    }
+
+    if (!running.compareAndSet(true, false)) {
+      return;
+    }
+    
+    // wake up all connections
+    synchronized (connections) {
+      for (Connection conn : connections.values()) {
+        conn.interrupt();
+      }
+    }
+    
+    // wait until all connections are closed
+    while (!connections.isEmpty()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  /** Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code>, returning the value.  Throws exceptions if there are
+   * network problems or if the remote code threw an exception. */
+  public Writable call(Writable param, InetSocketAddress address)
+  throws InterruptedException, IOException {
+      return call(param, address, null);
+  }
+  
+  public Writable call(Writable param, InetSocketAddress addr, 
+                       UserGroupInformation ticket)  
+                       throws InterruptedException, IOException {
+    Call call = new Call(param);
+    Connection connection = getConnection(addr, ticket, call);
+    connection.sendParam(call);                 // send the parameter
+    synchronized (call) {
+      while (!call.done) {
+        try {
+          call.wait();                           // wait for the result
+        } catch (InterruptedException ignored) {}
+      }
+
+      if (call.error != null) {
+        if (call.error instanceof RemoteException) {
+          call.error.fillInStackTrace();
+          throw call.error;
+        } else { // local exception
+          throw wrapException(addr, call.error);
+        }
+      } else {
+        return call.value;
+      }
+    }
+  }
+
+  /**
+   * Take an IOException and the address we were trying to connect to
+   * and return an IOException with the input exception as the cause.
+   * The new exception provides the stack trace of the place where 
+   * the exception is thrown and some extra diagnostics information.
+   * If the exception is ConnectException or SocketTimeoutException, 
+   * return a new one of the same type; Otherwise return an IOException.
+   * 
+   * @param addr target address
+   * @param exception the relevant exception
+   * @return an exception to throw
+   */
+  private IOException wrapException(InetSocketAddress addr,
+                                         IOException exception) {
+    if (exception instanceof ConnectException) {
+      //connection refused; include the host:port in the error
+      return (ConnectException)new ConnectException(
+           "Call to " + addr + " failed on connection exception: " + exception)
+                    .initCause(exception);
+    } else if (exception instanceof SocketTimeoutException) {
+      return (SocketTimeoutException)new SocketTimeoutException(
+           "Call to " + addr + " failed on socket timeout exception: "
+                      + exception).initCause(exception);
+    } else {
+      return (IOException)new IOException(
+           "Call to " + addr + " failed on local exception: " + exception)
+                                 .initCause(exception);
+
+    }
+  }
+
+  /** Makes a set of calls in parallel.  Each parameter is sent to the
+   * corresponding address.  When all values are available, or have timed out
+   * or errored, the collected results are returned in an array.  The array
+   * contains nulls for calls that timed out or errored.  */
+  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+    throws IOException {
+    if (addresses.length == 0) return new Writable[0];
+
+    ParallelResults results = new ParallelResults(params.length);
+    synchronized (results) {
+      for (int i = 0; i < params.length; i++) {
+        ParallelCall call = new ParallelCall(params[i], results, i);
+        try {
+          Connection connection = getConnection(addresses[i], null, call);
+          connection.sendParam(call);             // send each parameter
+        } catch (IOException e) {
+          // log errors
+          LOG.info("Calling "+addresses[i]+" caught: " + 
+                   e.getMessage(),e);
+          results.size--;                         //  wait for one fewer result
+        }
+      }
+      while (results.count != results.size) {
+        try {
+          results.wait();                    // wait for all results
+        } catch (InterruptedException e) {}
+      }
+
+      return results.values;
+    }
+  }
+
+  /** Get a connection from the pool, or create a new one and add it to the
+   * pool.  Connections to a given host/port are reused. */
+  private Connection getConnection(InetSocketAddress addr, 
+                                   UserGroupInformation ticket,
+                                   Call call)
+                                   throws IOException {
+    if (!running.get()) {
+      // the client is stopped
+      throw new IOException("The client is stopped");
+    }
+    Connection connection;
+    /* we could avoid this allocation for each RPC by having a  
+     * connectionsId object and with set() method. We need to manage the
+     * refs for keys in HashMap properly. For now its ok.
+     */
+    ConnectionId remoteId = new ConnectionId(addr, ticket);
+    do {
+      synchronized (connections) {
+        connection = connections.get(remoteId);
+        if (connection == null) {
+          connection = new Connection(remoteId);
+          connections.put(remoteId, connection);
+        }
+      }
+    } while (!connection.addCall(call));
+    
+    //we don't invoke the method below inside "synchronized (connections)"
+    //block above. The reason for that is if the server happens to be slow,
+    //it will take longer to establish a connection and that will slow the
+    //entire system down.
+    connection.setupIOstreams();
+    return connection;
+  }
+
+  /**
+   * This class holds the address and the user ticket. The client connections
+   * to servers are uniquely identified by <remoteAddress, ticket>
+   */
+  private static class ConnectionId {
+    InetSocketAddress address;
+    UserGroupInformation ticket;
+    
+    ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
+      this.address = address;
+      this.ticket = ticket;
+    }
+    
+    InetSocketAddress getAddress() {
+      return address;
+    }
+    UserGroupInformation getTicket() {
+      return ticket;
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+     if (obj instanceof ConnectionId) {
+       ConnectionId id = (ConnectionId) obj;
+       return address.equals(id.address) && ticket == id.ticket;
+       //Note : ticket is a ref comparision.
+     }
+     return false;
+    }
+    
+    @Override
+    public int hashCode() {
+      return address.hashCode() ^ System.identityHashCode(ticket);
+    }
+  }  
+}
\ No newline at end of file

Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=724238&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Sun Dec  7 19:20:43 2008
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/** A simple RPC mechanism.
+ *
+ * This is a local hbase copy of the hadoop RPC so we can do things like
+ * address HADOOP-414 for hbase-only and try other hbase-specific
+ * optimizations like using our own version of ObjectWritable.  Class has been
+ * renamed to avoid confusing it w/ hadoop versions.
+ * <p>
+ * 
+ *
+ * A <i>protocol</i> is a Java interface.  All parameters and return types must
+ * be one of:
+ *
+ * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
+ * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
+ * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
+ *
+ * <li>a {@link String}; or</li>
+ *
+ * <li>a {@link Writable}; or</li>
+ *
+ * <li>an array of the above types</li> </ul>
+ *
+ * All methods in the protocol should throw only IOException.  No field data of
+ * the protocol instance is transmitted.
+ */
+public class HBaseRPC {
+  // Leave this out in the hadoop ipc package but keep class name.  Do this
+  // so that we dont' get the logging of this class's invocations by doing our
+  // blanket enabling DEBUG on the o.a.h.h. package.
+  private static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
+
+  private HBaseRPC() {
+    super();
+  }                                  // no public ctor
+
+
+  /** A method invocation, including the method name and its parameters.*/
+  private static class Invocation implements Writable, Configurable {
+    // Here, for hbase, we maintain two static maps of method names to code and
+    // vice versa.
+    private static final Map<Byte, String> CODE_TO_METHODNAME =
+      new HashMap<Byte, String>();
+    private static final Map<String, Byte> METHODNAME_TO_CODE =
+      new HashMap<String, Byte>();
+    // Special code that means 'not-encoded'.
+    private static final byte NOT_ENCODED = 0;
+    static {
+      byte code = NOT_ENCODED + 1;
+      code = addToMap(VersionedProtocol.class, code);
+      code = addToMap(HMasterInterface.class, code);
+      code = addToMap(HMasterRegionInterface.class, code);
+      code = addToMap(TransactionalRegionInterface.class, code);
+    }
+    // End of hbase modifications.
+
+    private String methodName;
+    @SuppressWarnings("unchecked")
+    private Class[] parameterClasses;
+    private Object[] parameters;
+    private Configuration conf;
+
+    /** default constructor */
+    public Invocation() {
+      super();
+    }
+
+    /**
+     * @param method
+     * @param parameters
+     */
+    public Invocation(Method method, Object[] parameters) {
+      this.methodName = method.getName();
+      this.parameterClasses = method.getParameterTypes();
+      this.parameters = parameters;
+    }
+
+    /** @return The name of the method invoked. */
+    public String getMethodName() { return methodName; }
+
+    /** @return The parameter classes. */
+    @SuppressWarnings("unchecked")
+    public Class[] getParameterClasses() { return parameterClasses; }
+
+    /** @return The parameter instances. */
+    public Object[] getParameters() { return parameters; }
+
+    public void readFields(DataInput in) throws IOException {
+      byte code = in.readByte();
+      methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code));
+      parameters = new Object[in.readInt()];
+      parameterClasses = new Class[parameters.length];
+      HbaseObjectWritable objectWritable = new HbaseObjectWritable();
+      for (int i = 0; i < parameters.length; i++) {
+        parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
+          this.conf);
+        parameterClasses[i] = objectWritable.getDeclaredClass();
+      }
+    }
+
+    public void write(DataOutput out) throws IOException {
+      writeMethodNameCode(out, this.methodName);
+      out.writeInt(parameterClasses.length);
+      for (int i = 0; i < parameterClasses.length; i++) {
+        HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+                                   conf);
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buffer = new StringBuilder(256);
+      buffer.append(methodName);
+      buffer.append("(");
+      for (int i = 0; i < parameters.length; i++) {
+        if (i != 0)
+          buffer.append(", ");
+        buffer.append(parameters[i]);
+      }
+      buffer.append(")");
+      return buffer.toString();
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+    public Configuration getConf() {
+      return this.conf;
+    }
+    
+    // Hbase additions.
+    private static void addToMap(final String name, final byte code) {
+      if (METHODNAME_TO_CODE.containsKey(name)) {
+        return;
+      }
+      METHODNAME_TO_CODE.put(name, Byte.valueOf(code));
+      CODE_TO_METHODNAME.put(Byte.valueOf(code), name);
+    }
+    
+    /*
+     * @param c Class whose methods we'll add to the map of methods to codes
+     * (and vice versa).
+     * @param code Current state of the byte code.
+     * @return State of <code>code</code> when this method is done.
+     */
+    private static byte addToMap(final Class<?> c, final byte code) {
+      byte localCode = code;
+      Method [] methods = c.getMethods();
+      // There are no guarantees about the order in which items are returned in
+      // so do a sort (Was seeing that sort was one way on one server and then
+      // another on different server).
+      Arrays.sort(methods, new Comparator<Method>() {
+        public int compare(Method left, Method right) {
+          return left.getName().compareTo(right.getName());
+        }
+      });
+      for (int i = 0; i < methods.length; i++) {
+        addToMap(methods[i].getName(), localCode++);
+      }
+      return localCode;
+    }
+
+    /*
+     * Write out the code byte for passed Class.
+     * @param out
+     * @param c
+     * @throws IOException
+     */
+    static void writeMethodNameCode(final DataOutput out, final String methodname)
+    throws IOException {
+      Byte code = METHODNAME_TO_CODE.get(methodname);
+      if (code == null) {
+        LOG.error("Unsupported type " + methodname);
+        throw new UnsupportedOperationException("No code for unexpected " +
+          methodname);
+      }
+      out.writeByte(code.byteValue());
+    }
+    // End of hbase additions.
+  }
+
+  /* Cache a client using its socket factory as the hash key */
+  static private class ClientCache {
+    private Map<SocketFactory, HBaseClient> clients =
+      new HashMap<SocketFactory, HBaseClient>();
+
+    /**
+     * Construct & cache an IPC client with the user-provided SocketFactory 
+     * if no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized HBaseClient getClient(Configuration conf,
+        SocketFactory factory) {
+      // Construct & cache client.  The configuration is only used for timeout,
+      // and Clients have connection pools.  So we can either (a) lose some
+      // connection pooling and leak sockets, or (b) use the same timeout for all
+      // configurations.  Since the IPC is usually intended globally, not
+      // per-job, we choose (a).
+      HBaseClient client = clients.get(factory);
+      if (client == null) {
+        // Make an hbase client instead of hadoop Client.
+        client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
+        clients.put(factory, client);
+      } else {
+        ((HBaseClient)client).incCount();
+      }
+      return client;
+    }
+
+    /**
+     * Construct & cache an IPC client with the default SocketFactory 
+     * if no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized HBaseClient getClient(Configuration conf) {
+      return getClient(conf, SocketFactory.getDefault());
+    }
+
+    /**
+     * Stop a RPC client connection 
+     * A RPC client is closed only when its reference count becomes zero.
+     */
+    private void stopClient(HBaseClient client) {
+      synchronized (this) {
+        ((HBaseClient)client).decCount();
+        if (((HBaseClient)client).isZeroReference()) {
+          clients.remove(((HBaseClient)client).getSocketFactory());
+        }
+      }
+      if (((HBaseClient)client).isZeroReference()) {
+        client.stop();
+      }
+    }
+  }
+
+  private static ClientCache CLIENTS = new ClientCache();
+  
+  private static class Invoker implements InvocationHandler {
+    private InetSocketAddress address;
+    private UserGroupInformation ticket;
+    private HBaseClient client;
+    private boolean isClosed = false;
+
+    /**
+     * @param address
+     * @param ticket
+     * @param conf
+     * @param factory
+     */
+    public Invoker(InetSocketAddress address, UserGroupInformation ticket, 
+                   Configuration conf, SocketFactory factory) {
+      this.address = address;
+      this.ticket = ticket;
+      this.client = CLIENTS.getClient(conf, factory);
+    }
+
+    public Object invoke(@SuppressWarnings("unused") Object proxy,
+        Method method, Object[] args)
+      throws Throwable {
+      final boolean logDebug = LOG.isDebugEnabled();
+      long startTime = 0;
+      if (logDebug) {
+        startTime = System.currentTimeMillis();
+      }
+      HbaseObjectWritable value = (HbaseObjectWritable)
+        client.call(new Invocation(method, args), address, ticket);
+      if (logDebug) {
+        long callTime = System.currentTimeMillis() - startTime;
+        LOG.debug("Call: " + method.getName() + " " + callTime);
+      }
+      return value.get();
+    }
+    
+    /* close the IPC client that's responsible for this invoker's RPCs */ 
+    synchronized private void close() {
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
+      }
+    }
+  }
+
+  /**
+   * A version mismatch for the RPC protocol.
+   */
+  @SuppressWarnings("serial")
+  public static class VersionMismatch extends IOException {
+    private String interfaceName;
+    private long clientVersion;
+    private long serverVersion;
+    
+    /**
+     * Create a version mismatch exception
+     * @param interfaceName the name of the protocol mismatch
+     * @param clientVersion the client's version of the protocol
+     * @param serverVersion the server's version of the protocol
+     */
+    public VersionMismatch(String interfaceName, long clientVersion,
+                           long serverVersion) {
+      super("Protocol " + interfaceName + " version mismatch. (client = " +
+            clientVersion + ", server = " + serverVersion + ")");
+      this.interfaceName = interfaceName;
+      this.clientVersion = clientVersion;
+      this.serverVersion = serverVersion;
+    }
+    
+    /**
+     * Get the interface name
+     * @return the java class name 
+     *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
+     */
+    public String getInterfaceName() {
+      return interfaceName;
+    }
+    
+    /**
+     * @return the client's preferred version
+     */
+    public long getClientVersion() {
+      return clientVersion;
+    }
+    
+    /**
+     * @return the server's agreed to version.
+     */
+    public long getServerVersion() {
+      return serverVersion;
+    }
+  }
+  
+  /**
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @param maxAttempts
+   * @return proxy
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static VersionedProtocol waitForProxy(Class protocol,
+                                               long clientVersion,
+                                               InetSocketAddress addr,
+                                               Configuration conf,
+                                               int maxAttempts
+                                               ) throws IOException {
+    // HBase does limited number of reconnects which is different from hadoop.
+    int reconnectAttempts = 0;
+    while (true) {
+      try {
+        return getProxy(protocol, clientVersion, addr, conf);
+      } catch(ConnectException se) {  // namenode has not been started
+        LOG.info("Server at " + addr + " not available yet, Zzzzz...");
+        if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
+          LOG.info("Server at " + addr + " could not be reached after " +
+                  reconnectAttempts + " tries, giving up.");
+          throw new RetriesExhaustedException(addr.toString(), "unknown".getBytes(),
+                  "unknown".getBytes(), reconnectAttempts - 1,
+                  new ArrayList<Throwable>());
+      }
+      } catch(SocketTimeoutException te) {  // namenode is busy
+        LOG.info("Problem connecting to server: " + addr);
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        // IGNORE
+      }
+    }
+  }
+
+  /**
+   * Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address.
+   *
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @param factory
+   * @return proxy
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(Class<?> protocol,
+      long clientVersion, InetSocketAddress addr, Configuration conf,
+      SocketFactory factory) throws IOException {
+    return getProxy(protocol, clientVersion, addr, null, conf, factory);
+  }
+  
+  /**
+   * Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address.
+   *
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param ticket
+   * @param conf
+   * @param factory
+   * @return proxy
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(Class<?> protocol,
+      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
+      Configuration conf, SocketFactory factory)
+  throws IOException {    
+    VersionedProtocol proxy =
+        (VersionedProtocol) Proxy.newProxyInstance(
+            protocol.getClassLoader(), new Class[] { protocol },
+            new Invoker(addr, ticket, conf, factory));
+    long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
+                                                  clientVersion);
+    if (serverVersion == clientVersion) {
+      return proxy;
+    } else {
+      throw new VersionMismatch(protocol.getName(), clientVersion, 
+                                serverVersion);
+    }
+  }
+
+  /**
+   * Construct a client-side proxy object with the default SocketFactory
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @return a proxy instance
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(Class<?> protocol,
+      long clientVersion, InetSocketAddress addr, Configuration conf)
+      throws IOException {
+
+    return getProxy(protocol, clientVersion, addr, conf, NetUtils
+        .getDefaultSocketFactory(conf));
+  }
+
+  /**
+   * Stop this proxy and release its invoker's resource
+   * @param proxy the proxy to be stopped
+   */
+  public static void stopProxy(VersionedProtocol proxy) {
+    if (proxy!=null) {
+      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+    }
+  }
+
+  /**
+   * Expert: Make multiple, parallel calls to a set of servers.
+   *
+   * @param method
+   * @param params
+   * @param addrs
+   * @param conf
+   * @return values
+   * @throws IOException
+   */
+  public static Object[] call(Method method, Object[][] params,
+                              InetSocketAddress[] addrs, Configuration conf)
+    throws IOException {
+
+    Invocation[] invocations = new Invocation[params.length];
+    for (int i = 0; i < params.length; i++)
+      invocations[i] = new Invocation(method, params[i]);
+    HBaseClient client = CLIENTS.getClient(conf);
+    try {
+    Writable[] wrappedValues = client.call(invocations, addrs);
+    
+    if (method.getReturnType() == Void.TYPE) {
+      return null;
+    }
+
+    Object[] values =
+      (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
+    for (int i = 0; i < values.length; i++)
+      if (wrappedValues[i] != null)
+        values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
+    
+    return values;
+    } finally {
+      CLIENTS.stopClient(client);
+    }
+  }
+
+  /**
+   * Construct a server for a protocol implementation instance listening on a
+   * port and address.
+   *
+   * @param instance
+   * @param bindAddress
+   * @param port
+   * @param conf
+   * @return Server
+   * @throws IOException
+   */
+  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
+    throws IOException {
+    return getServer(instance, bindAddress, port, 1, false, conf);
+  }
+
+  /**
+   * Construct a server for a protocol implementation instance listening on a
+   * port and address.
+   *
+   * @param instance
+   * @param bindAddress
+   * @param port
+   * @param numHandlers
+   * @param verbose
+   * @param conf
+   * @return Server
+   * @throws IOException
+   */
+  public static Server getServer(final Object instance, final String bindAddress, final int port,
+                                 final int numHandlers,
+                                 final boolean verbose, Configuration conf) 
+    throws IOException {
+    return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+  }
+
+  /** An RPC Server. */
+  public static class Server extends HBaseServer {
+    private Object instance;
+    private Class<?> implementation;
+    private boolean verbose;
+
+    /**
+     * Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @throws IOException
+     */
+    public Server(Object instance, Configuration conf, String bindAddress, int port) 
+      throws IOException {
+      this(instance, conf,  bindAddress, port, 1, false);
+    }
+    
+    private static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+    
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * @throws IOException
+     */
+    public Server(Object instance, Configuration conf, String bindAddress,  int port,
+                  int numHandlers, boolean verbose) throws IOException {
+      super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
+      this.instance = instance;
+      this.implementation = instance.getClass();
+      this.verbose = verbose;
+    }
+
+    @Override
+    public Writable call(Writable param, long receivedTime) throws IOException {
+      try {
+        Invocation call = (Invocation)param;
+        if (verbose) log("Call: " + call);
+        Method method =
+          implementation.getMethod(call.getMethodName(),
+                                   call.getParameterClasses());
+
+        long startTime = System.currentTimeMillis();
+        Object value = method.invoke(instance, call.getParameters());
+        int processingTime = (int) (System.currentTimeMillis() - startTime);
+        int qTime = (int) (startTime-receivedTime);
+        LOG.debug("Served: " + call.getMethodName() +
+            " queueTime= " + qTime +
+            " procesingTime= " + processingTime);
+        rpcMetrics.rpcQueueTime.inc(qTime);
+        rpcMetrics.rpcProcessingTime.inc(processingTime);
+
+	MetricsTimeVaryingRate m = rpcMetrics.metricsList.get(call.getMethodName());
+
+	if (m != null) {
+		m.inc(processingTime);
+	}
+	else {
+		rpcMetrics.metricsList.put(call.getMethodName(), new MetricsTimeVaryingRate(call.getMethodName()));
+		m = rpcMetrics.metricsList.get(call.getMethodName());
+		m.inc(processingTime);
+	}
+
+        if (verbose) log("Return: "+value);
+
+        return new HbaseObjectWritable(method.getReturnType(), value);
+
+      } catch (InvocationTargetException e) {
+        Throwable target = e.getTargetException();
+        if (target instanceof IOException) {
+          throw (IOException)target;
+        } else {
+          IOException ioe = new IOException(target.toString());
+          ioe.setStackTrace(target.getStackTrace());
+          throw ioe;
+        }
+      } catch (Throwable e) {
+        IOException ioe = new IOException(e.toString());
+        ioe.setStackTrace(e.getStackTrace());
+        throw ioe;
+      }
+    }
+  }
+
+  private static void log(String value) {
+    if (value!= null && value.length() > 55)
+      value = value.substring(0, 55)+"...";
+    LOG.info(value);
+  }
+}

Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java Sun Dec  7 19:20:43 2008
@@ -63,7 +63,8 @@
    * <li>Version 11: Changed getClosestRowBefore signature.</li>
    * <li>Version 12: HServerLoad extensions (HBASE-1018).</li>
    * <li>Version 13: HBASE-847</li>
+   * <li>Version 14: HBASE-900</li>
    * </ul>
    */
-  public static final long versionID = 13L;
+  public static final long versionID = 14L;
 }

Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=724238&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Sun Dec  7 19:20:43 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ * 
+ * This class is for maintaining  the various RPC statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values;
+ * for example:
+ *  <p> {@link #rpcQueueTime}.inc(time)
+ *
+ */
+public class HBaseRpcMetrics implements Updater {
+  private MetricsRecord metricsRecord;
+  private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class);
+  
+  public HBaseRpcMetrics(String hostName, String port, HBaseServer server) {
+    MetricsContext context = MetricsUtil.getContext("rpc");
+    metricsRecord = MetricsUtil.createRecord(context, "metrics");
+
+    metricsRecord.setTag("port", port);
+
+    LOG.info("Initializing RPC Metrics with hostName=" 
+        + hostName + ", port=" + port);
+
+    context.registerUpdater(this);
+  }
+  
+  
+  /**
+   * The metrics variables are public:
+   *  - they can be set directly by calling their set/inc methods
+   *  -they can also be read directly - e.g. JMX does this.
+   */
+  
+  public MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime");
+  public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime");
+
+  public Map <String, MetricsTimeVaryingRate> metricsList = Collections.synchronizedMap(new HashMap<String, MetricsTimeVaryingRate>());
+
+  
+  
+  /**
+   * Push the metrics to the monitoring subsystem on doUpdate() call.
+   */
+  public void doUpdates(MetricsContext context) {
+    rpcQueueTime.pushMetric(metricsRecord);
+    rpcProcessingTime.pushMetric(metricsRecord);
+
+    synchronized (metricsList) {
+	// Iterate through the rpcMetrics hashmap to propogate the different rpc metrics.
+	Set keys = metricsList.keySet();
+
+	Iterator keyIter = keys.iterator();
+
+	while (keyIter.hasNext()) {
+		Object key = keyIter.next();
+		MetricsTimeVaryingRate value = metricsList.get(key);
+
+		value.pushMetric(metricsRecord);
+	}
+    }
+    metricsRecord.update();
+  }
+
+  public void shutdown() {
+    // Nothing to do
+  }
+}
\ No newline at end of file