You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/07/24 00:13:23 UTC

svn commit: r679212 - in /hadoop/hbase/trunk: CHANGES.txt src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/ipc/HBaseClient.java

Author: stack
Date: Wed Jul 23 15:13:23 2008
New Revision: 679212

URL: http://svn.apache.org/viewvc?rev=679212&view=rev
Log:
HBASE-770 Update HBaseRPC to match hadoop 0.17 RPC

Added:
    hadoop/hbase/trunk/src/java/org/apache/hadoop/ipc/
    hadoop/hbase/trunk/src/java/org/apache/hadoop/ipc/HBaseClient.java
Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=679212&r1=679211&r2=679212&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Wed Jul 23 15:13:23 2008
@@ -215,6 +215,7 @@
    HBASE-769   TestMasterAdmin fails throwing RegionOfflineException when we're
                expecting IllegalStateException
    HBASE-766   FileNotFoundException trying to load HStoreFile 'data'
+   HBASE-770   Update HBaseRPC to match hadoop 0.17 RPC
    
   IMPROVEMENTS
    HBASE-559   MR example job to count table rows

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java?rev=679212&r1=679211&r2=679212&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java Wed Jul 23 15:13:23 2008
@@ -18,42 +18,46 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Method;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.lang.reflect.Array;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
-
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
-import java.io.*;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
-import java.util.Collection;
+import java.util.Map;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.*;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.HBaseClient;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.conf.*;
 
 /** A simple RPC mechanism.
- * 
+ *
  * This is a local hbase copy of the hadoop RPC so we can do things like
  * address HADOOP-414 for hbase-only and try other hbase-specific
  * optimizations like using our own version of ObjectWritable.  Class has been
  * renamed to avoid confusing it w/ hadoop versions.
+ * <p>
  * 
- * <p>Below are continued the class comments from hadoop RPC class.
  *
  * A <i>protocol</i> is a Java interface.  All parameters and return types must
  * be one of:
@@ -70,12 +74,13 @@
  *
  * All methods in the protocol should throw only IOException.  No field data of
  * the protocol instance is transmitted.
- * 
- * @see org.apache.hadoop.ipc.RPC
  */
 public class HbaseRPC {
+  // Leave this out in the hadoop ipc package but keep class name.  Do this
+  // so that we dont' get the logging of this class's invocations by doing our
+  // blanket enabling DEBUG on the o.a.h.h. package.
   private static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.ipc.RPC");
+    LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
 
   private HbaseRPC() {}                                  // no public ctor
 
@@ -108,9 +113,9 @@
       methodName = Text.readString(in);
       parameters = new Object[in.readInt()];
       parameterClasses = new Class[parameters.length];
-      HbaseObjectWritable objectWritable = new HbaseObjectWritable();
+      ObjectWritable objectWritable = new ObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = HbaseObjectWritable.readObject(in, objectWritable, this.conf);
+        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
         parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
@@ -119,7 +124,7 @@
       Text.writeString(out, methodName);
       out.writeInt(parameterClasses.length);
       for (int i = 0; i < parameterClasses.length; i++) {
-        HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
                                    conf);
       }
     }
@@ -147,83 +152,93 @@
 
   }
 
-  private static Map<SocketFactory, Client> CLIENTS =
+  /* Cache a client using its socket factory as the hash key */
+  static private class ClientCache {
+    private Map<SocketFactory, Client> clients =
       new HashMap<SocketFactory, Client>();
 
-  private static synchronized Client getClient(Configuration conf,
-      SocketFactory factory) {
-    // Construct & cache client.  The configuration is only used for timeout,
-    // and Clients have connection pools.  So we can either (a) lose some
-    // connection pooling and leak sockets, or (b) use the same timeout for all
-    // configurations.  Since the IPC is usually intended globally, not
-    // per-job, we choose (a).
-    Client client = CLIENTS.get(factory);
-    if (client == null) {
-      client = new Client(HbaseObjectWritable.class, conf, factory);
-      CLIENTS.put(factory, client);
+    /**
+     * Construct & cache an IPC client with the user-provided SocketFactory 
+     * if no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized Client getClient(Configuration conf,
+        SocketFactory factory) {
+      // Construct & cache client.  The configuration is only used for timeout,
+      // and Clients have connection pools.  So we can either (a) lose some
+      // connection pooling and leak sockets, or (b) use the same timeout for all
+      // configurations.  Since the IPC is usually intended globally, not
+      // per-job, we choose (a).
+      Client client = clients.get(factory);
+      if (client == null) {
+        client = new HBaseClient(ObjectWritable.class, conf, factory);
+        clients.put(factory, client);
+      } else {
+        ((HBaseClient)client).incCount();
+      }
+      return client;
     }
-    return client;
-  }
-  
-  /**
-   * Construct & cache client with the default SocketFactory.
-   * @param conf
-   * @return
-   */
-  private static Client getClient(Configuration conf) {
-    return getClient(conf, SocketFactory.getDefault());
-  }
-
-  /**
-   * Stop all RPC client connections
-   */
-  public static synchronized void stopClient(){
-    for (Client client : CLIENTS.values())
-      client.stop();
-    CLIENTS.clear();
-  }
 
-  /*
-   * remove specified client from the list of clients.
-   */
-  static synchronized void removeClients() {
-    CLIENTS.clear();
-  }
+    /**
+     * Construct & cache an IPC client with the default SocketFactory 
+     * if no cached client exists.
+     * 
+     * @param conf Configuration
+     * @return an IPC client
+     */
+    private synchronized Client getClient(Configuration conf) {
+      return getClient(conf, SocketFactory.getDefault());
+    }
 
-  static synchronized Collection allClients() {
-    return CLIENTS.values();
+    /**
+     * Stop a RPC client connection 
+     * A RPC client is closed only when its reference count becomes zero.
+     */
+    private void stopClient(Client client) {
+      synchronized (this) {
+        ((HBaseClient)client).decCount();
+        if (((HBaseClient)client).isZeroReference()) {
+          clients.remove(((HBaseClient)client).getSocketFactory());
+        }
+      }
+      if (((HBaseClient)client).isZeroReference()) {
+        client.stop();
+      }
+    }
   }
 
+  private static ClientCache CLIENTS=new ClientCache();
+  
   private static class Invoker implements InvocationHandler {
     private InetSocketAddress address;
     private UserGroupInformation ticket;
     private Client client;
+    private boolean isClosed = false;
 
     public Invoker(InetSocketAddress address, UserGroupInformation ticket, 
                    Configuration conf, SocketFactory factory) {
       this.address = address;
       this.ticket = ticket;
-      this.client = getClient(conf, factory);
+      this.client = CLIENTS.getClient(conf, factory);
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)
-      throws IOException {
+      throws Throwable {
       long startTime = System.currentTimeMillis();
-      try {
-        HbaseObjectWritable value = (HbaseObjectWritable)
-          client.call(new Invocation(method, args), address, ticket);
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
-        return value.get();
-      } catch (Throwable t) {
-        IOException e;
-        if (t instanceof IOException) {
-          e = (IOException) t;
-        } else {
-          e = new IOException("error during RPC call");
-          e.initCause(t.getCause());
-        }
-        throw e;
+      ObjectWritable value = (ObjectWritable)
+        client.call(new Invocation(method, args), address, ticket);
+      long callTime = System.currentTimeMillis() - startTime;
+      LOG.debug("Call: " + method.getName() + " " + callTime);
+      return value.get();
+    }
+    
+    /* close the IPC client that's responsible for this invoker's RPCs */ 
+    synchronized private void close() {
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
       }
     }
   }
@@ -261,7 +276,7 @@
     }
     
     /**
-     * Get the client's prefered version
+     * Get the client's preferred version
      */
     public long getClientVersion() {
       return clientVersion;
@@ -275,28 +290,25 @@
     }
   }
   
-  /** 
-   * @param maxAttempts the number of times that getProxy() should be called before 
-   * giving up.  If a negative number is passed, it will retry indefinitely.
-   */
   public static VersionedProtocol waitForProxy(Class protocol,
                                                long clientVersion,
                                                InetSocketAddress addr,
                                                Configuration conf,
-                                               int maxAttempts) throws IOException {
+                                               int maxAttempts
+                                               ) throws IOException {
     int reconnectAttempts = 0;
-	while (true) {
+    while (true) {
       try {
         return getProxy(protocol, clientVersion, addr, conf);
       } catch(ConnectException se) {  // namenode has not been started
         LOG.info("Server at " + addr + " not available yet, Zzzzz...");
         if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
-        	LOG.info("Server at " + addr + " could not be reached after " + 
-        			reconnectAttempts + " tries, giving up.");
-        	throw new RetriesExhaustedException(addr.toString(), "unknown".getBytes(), 
-        			"unknown".getBytes(), reconnectAttempts - 1, 
-        			new ArrayList<Throwable>());
-        }
+          LOG.info("Server at " + addr + " could not be reached after " +
+                  reconnectAttempts + " tries, giving up.");
+          throw new RetriesExhaustedException(addr.toString(), "unknown".getBytes(),
+                  "unknown".getBytes(), reconnectAttempts - 1,
+                  new ArrayList<Throwable>());
+      }
       } catch(SocketTimeoutException te) {  // namenode is busy
         LOG.info("Problem connecting to server: " + addr);
       }
@@ -353,6 +365,16 @@
         .getDefaultSocketFactory(conf));
   }
 
+  /**
+   * Stop this proxy and release its invoker's resource
+   * @param proxy the proxy to be stopped
+   */
+  public static void stopProxy(VersionedProtocol proxy) {
+    if (proxy!=null) {
+      ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+    }
+  }
+
   /** Expert: Make multiple, parallel calls to a set of servers. */
   public static Object[] call(Method method, Object[][] params,
                               InetSocketAddress[] addrs, Configuration conf)
@@ -361,7 +383,9 @@
     Invocation[] invocations = new Invocation[params.length];
     for (int i = 0; i < params.length; i++)
       invocations[i] = new Invocation(method, params[i]);
-    Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
+    Client client = CLIENTS.getClient(conf);
+    try {
+    Writable[] wrappedValues = client.call(invocations, addrs);
     
     if (method.getReturnType() == Void.TYPE) {
       return null;
@@ -371,9 +395,12 @@
       (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
     for (int i = 0; i < values.length; i++)
       if (wrappedValues[i] != null)
-        values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
+        values[i] = ((ObjectWritable)wrappedValues[i]).get();
     
     return values;
+    } finally {
+      CLIENTS.stopClient(client);
+    }
   }
 
   /** Construct a server for a protocol implementation instance listening on a
@@ -408,7 +435,15 @@
       throws IOException {
       this(instance, conf,  bindAddress, port, 1, false);
     }
-
+    
+    private static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length-1];
+    }
+    
     /** Construct an RPC server.
      * @param instance the instance whose methods will be called
      * @param conf the configuration to use
@@ -419,13 +454,13 @@
      */
     public Server(Object instance, Configuration conf, String bindAddress,  int port,
                   int numHandlers, boolean verbose) throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, conf);
+      super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
       this.instance = instance;
       this.implementation = instance.getClass();
       this.verbose = verbose;
     }
 
-    public Writable call(Writable param, long receiveTime) throws IOException {
+    public Writable call(Writable param, long receivedTime) throws IOException {
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);
@@ -436,11 +471,28 @@
 
         long startTime = System.currentTimeMillis();
         Object value = method.invoke(instance, call.getParameters());
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Served: " + call.getMethodName() + " " + callTime);
+        int processingTime = (int) (System.currentTimeMillis() - startTime);
+        int qTime = (int) (startTime-receivedTime);
+        LOG.debug("Served: " + call.getMethodName() +
+            " queueTime= " + qTime +
+            " procesingTime= " + processingTime);
+        rpcMetrics.rpcQueueTime.inc(qTime);
+        rpcMetrics.rpcProcessingTime.inc(processingTime);
+
+	MetricsTimeVaryingRate m = rpcMetrics.metricsList.get(call.getMethodName());
+
+	if (m != null) {
+		m.inc(processingTime);
+	}
+	else {
+		rpcMetrics.metricsList.put(call.getMethodName(), new MetricsTimeVaryingRate(call.getMethodName()));
+		m = rpcMetrics.metricsList.get(call.getMethodName());
+		m.inc(processingTime);
+	}
+
         if (verbose) log("Return: "+value);
 
-        return new HbaseObjectWritable(method.getReturnType(), value);
+        return new ObjectWritable(method.getReturnType(), value);
 
       } catch (InvocationTargetException e) {
         Throwable target = e.getTargetException();

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/ipc/HBaseClient.java?rev=679212&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/ipc/HBaseClient.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/ipc/HBaseClient.java Wed Jul 23 15:13:23 2008
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Subclass of hadoop's Client just so we can make some methods accessible
+ * in {@link org.apache.hadoop.hbase.ipc.HbaseRPC}
+ */
+public class HBaseClient extends Client {
+  public HBaseClient(Class valueClass, Configuration conf, SocketFactory factory) {
+    super(valueClass, conf, factory);
+  }
+
+  public HBaseClient(Class<?> valueClass, Configuration conf) {
+    super(valueClass, conf);
+  }
+  
+  @Override
+  public void incCount() {
+    super.incCount();
+  }
+  
+  @Override
+  public void decCount() {
+    super.decCount();
+  }
+  
+  @Override
+  public boolean isZeroReference() {
+    return super.isZeroReference();
+  }
+  
+  @Override
+  public SocketFactory getSocketFactory() {
+    return super.getSocketFactory();
+  }
+}
\ No newline at end of file