You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2008/12/08 04:20:44 UTC
svn commit: r724238 [1/3] - in /hadoop/hbase/branches/0.19_on_hadoop_0.18:
./ conf/ src/java/org/apache/hadoop/hbase/
src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/
src/java/org/apach...
Author: apurtell
Date: Sun Dec 7 19:20:43 2008
New Revision: 724238
URL: http://svn.apache.org/viewvc?rev=724238&view=rev
Log:
merge up to trunk (rev 724237)
Added:
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/package.html
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/rest/package.html
Removed:
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/ipc/
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/test/org/apache/hadoop/hbase/TestGlobalMemcacheLimit.java
Modified:
hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt
hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/master/HMaster.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/util/Sleeper.java
hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/onelab/filter/Filter.java
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/CHANGES.txt Sun Dec 7 19:20:43 2008
@@ -90,12 +90,20 @@
HBASE-1041 Migration throwing NPE
HBASE-1042 OOME but we don't abort; two part commit.
HBASE-927 We don't recover if HRS hosting -ROOT-/.META. goes down
+ HBASE-1029 REST wiki documentation incorrect
+ (Sishen Freecity via Stack)
HBASE-1043 Removing @Override attributes where they are no longer needed.
(Ryan Smith via Jim Kellerman)
HBASE-927 We don't recover if HRS hosting -ROOT-/.META. goes down -
(fix bug in createTable which caused tests to fail)
+ HBASE-1039 Compaction fails if bloomfilters are enabled
+ HBASE-1027 Make global flusher check work with percentages rather than
+ hard code memory sizes
+ HBASE-1000 Sleeper.sleep does not go back to sleep when interrupted
+ and no stop flag given.
+ HBASE-900 Regionserver memory leak causing OOME during relatively
+ modest bulk importing; part 1
-
IMPROVEMENTS
HBASE-901 Add a limit to key length, check key and value length on client side
HBASE-890 Alter table operation and also related changes in REST interface
@@ -158,6 +166,8 @@
HBASE-1030 Bit of polish on HBASE-1018
HBASE-847 new API: HTable.getRow with numVersion specified
(DoÄacan Güney via Stack)
+ HBASE-1048 HLog: Found 0 logs to remove out of total 1450; oldest
+ outstanding seqnum is 162297053 fr om region -ROOT-,,0
NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/conf/hbase-default.xml Sun Dec 7 19:20:43 2008
@@ -191,19 +191,19 @@
</description>
</property>
<property>
- <name>hbase.regionserver.globalMemcacheLimit</name>
- <value>536870912</value>
+ <name>hbase.regionserver.globalMemcache.upperLimit</name>
+ <value>0.4</value>
<description>Maximum size of all memcaches in a region server before new
- updates are blocked and flushes are forced. Defaults to 512MB.
+ updates are blocked and flushes are forced. Defaults to 40% of heap.
</description>
</property>
<property>
- <name>hbase.regionserver.globalMemcacheLimitlowMark</name>
- <value>256435456</value>
+ <name>hbase.regionserver.globalMemcache.lowerLimit</name>
+ <value>0.25</value>
<description>When memcaches are being forced to flush to make room in
- memory, keep flushing until we hit this mark. Defaults to 256MB. Setting
- this value equal to hbase.regionserver.globalmemcachelimit causes the
- minimum possible flushing to occur when updates are blocked due to
+ memory, keep flushing until we hit this mark. Defaults to 30% of heap.
+ This value equal to hbase.regionserver.globalmemcache.upperLimit causes
+ the minimum possible flushing to occur when updates are blocked due to
memcache limiting.
</description>
</property>
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/Chore.java Sun Dec 7 19:20:43 2008
@@ -59,19 +59,21 @@
this.sleeper.sleep();
}
this.sleeper.sleep();
- while(!this.stop.get()) {
+ while (!this.stop.get()) {
+ long startTime = System.currentTimeMillis();
try {
- long startTime = System.currentTimeMillis();
chore();
- this.sleeper.sleep(startTime);
} catch (Exception e) {
LOG.error("Caught exception", e);
+ if (this.stop.get()) {
+ continue;
+ }
}
+ this.sleeper.sleep(startTime);
}
} catch (Throwable t) {
LOG.fatal("Caught error. Starting shutdown.", t);
this.stop.set(true);
-
} finally {
LOG.info(getName() + " exiting");
}
@@ -97,4 +99,4 @@
protected void sleep() {
this.sleeper.sleep();
}
-}
\ No newline at end of file
+}
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sun Dec 7 19:20:43 2008
@@ -50,7 +50,7 @@
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.ipc.HbaseRPC;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Writables;
@@ -192,7 +192,7 @@
masterLocation = new HServerAddress(this.conf.get(MASTER_ADDRESS,
DEFAULT_MASTER_ADDRESS));
try {
- HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy(
+ HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
masterLocation.getInetSocketAddress(), this.conf);
@@ -732,7 +732,7 @@
server = this.servers.get(regionServer.toString());
if (server == null) { // Get a connection
try {
- server = (HRegionInterface)HbaseRPC.waitForProxy(
+ server = (HRegionInterface)HBaseRPC.waitForProxy(
serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
regionServer.getInetSocketAddress(), this.conf,
this.maxRPCAttempts);
@@ -954,7 +954,7 @@
void close(boolean stopProxy) {
if (master != null) {
if (stopProxy) {
- HbaseRPC.stopProxy(master);
+ HBaseRPC.stopProxy(master);
}
master = null;
masterChecked = false;
@@ -962,7 +962,7 @@
if (stopProxy) {
synchronized (servers) {
for (HRegionInterface i: servers.values()) {
- HbaseRPC.stopProxy(i);
+ HBaseRPC.stopProxy(i);
}
}
}
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/client/HTable.java Sun Dec 7 19:20:43 2008
@@ -487,7 +487,7 @@
* at a specified timestamp
*
* @param row row key
- * @param ts timestamp
+ * @param timestamp timestamp
* @param numVersions number of versions to return
* @return RowResult is empty if row does not exist.
* @throws IOException
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/io/BloomFilterMapFile.java Sun Dec 7 19:20:43 2008
@@ -75,8 +75,8 @@
throws IOException {
Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
if(!fs.exists(filterFile)) {
- throw new FileNotFoundException("Could not find bloom filter: " +
- filterFile);
+ LOG.warn("FileNotFound: " + filterFile + "; proceeding without");
+ return null;
}
BloomFilter filter = new BloomFilter();
FSDataInputStream in = fs.open(filterFile);
@@ -180,13 +180,19 @@
* If we fix the number of hash functions and know the number of
* entries, then the optimal vector size m = (k * n) / ln(2)
*/
- this.bloomFilter = new BloomFilter(
+ BloomFilter f = null;
+ try {
+ f = new BloomFilter(
(int) Math.ceil(
(DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
Math.log(2.0)),
(int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
Hash.getHashType(conf)
- );
+ );
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Failed creating bloomfilter; proceeding without", e);
+ }
+ this.bloomFilter = f;
} else {
this.bloomFilter = null;
}
Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=724238&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Sun Dec 7 19:20:43 2008
@@ -0,0 +1,844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.net.ConnectException;
+
+import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FilterInputStream;
+import java.io.InputStream;
+
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/** A client for an IPC service. IPC calls take a single {@link Writable} as a
+ * parameter, and return a {@link Writable} as their value. A service runs on
+ * a port and is defined by a parameter class and a value class.
+ *
+ * <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
+ * moved into this package so can access package-private methods.
+ *
+ * @see HBaseServer
+ */
+public class HBaseClient {
+
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.ipc.HBaseClass");
+ private Hashtable<ConnectionId, Connection> connections =
+ new Hashtable<ConnectionId, Connection>();
+
+ private Class<? extends Writable> valueClass; // class of call values
+ private int counter; // counter for call ids
+ private AtomicBoolean running = new AtomicBoolean(true); // if client runs
+ final private Configuration conf;
+ final private int maxIdleTime; //connections will be culled if it was idle for
+ //maxIdleTime msecs
+ final private int maxRetries; //the max. no. of retries for socket connections
+ private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+ private int pingInterval; // how often sends ping to the server in msecs
+
+ private SocketFactory socketFactory; // how to create sockets
+ private int refCount = 1;
+
+ final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
+ final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
+ final static int PING_CALL_ID = -1;
+
+ /**
+ * set the ping interval value in configuration
+ *
+ * @param conf Configuration
+ * @param pingInterval the ping interval
+ */
+ final public static void setPingInterval(Configuration conf, int pingInterval) {
+ conf.setInt(PING_INTERVAL_NAME, pingInterval);
+ }
+
+ /**
+ * Get the ping interval from configuration;
+ * If not set in the configuration, return the default value.
+ *
+ * @param conf Configuration
+ * @return the ping interval
+ */
+ final static int getPingInterval(Configuration conf) {
+ return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
+ }
+
+ /**
+ * Increment this client's reference count
+ *
+ */
+ synchronized void incCount() {
+ refCount++;
+ }
+
+ /**
+ * Decrement this client's reference count
+ *
+ */
+ synchronized void decCount() {
+ refCount--;
+ }
+
+ /**
+ * Return if this client has no reference
+ *
+ * @return true if this client has no reference; false otherwise
+ */
+ synchronized boolean isZeroReference() {
+ return refCount==0;
+ }
+
+ /** A call waiting for a value. */
+ private class Call {
+ int id; // call id
+ Writable param; // parameter
+ Writable value; // value, null if error
+ IOException error; // exception, null if value
+ boolean done; // true when call is done
+
+ protected Call(Writable param) {
+ this.param = param;
+ synchronized (HBaseClient.this) {
+ this.id = counter++;
+ }
+ }
+
+ /** Indicate when the call is complete and the
+ * value or error are available. Notifies by default. */
+ protected synchronized void callComplete() {
+ this.done = true;
+ notify(); // notify caller
+ }
+
+ /** Set the exception when there is an error.
+ * Notify the caller the call is done.
+ *
+ * @param error exception thrown by the call; either local or remote
+ */
+ public synchronized void setException(IOException error) {
+ this.error = error;
+ callComplete();
+ }
+
+ /** Set the return value when there is no error.
+ * Notify the caller the call is done.
+ *
+ * @param value return value of the call.
+ */
+ public synchronized void setValue(Writable value) {
+ this.value = value;
+ callComplete();
+ }
+ }
+
+ /** Thread that reads responses and notifies callers. Each connection owns a
+ * socket connected to a remote address. Calls are multiplexed through this
+ * socket: responses may be delivered out of order. */
+ private class Connection extends Thread {
+ private ConnectionId remoteId;
+ private Socket socket = null; // connected socket
+ private DataInputStream in;
+ private DataOutputStream out;
+
+ // currently active calls
+ private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
+ private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
+ private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
+ private IOException closeException; // close reason
+
+ public Connection(InetSocketAddress address) throws IOException {
+ this(new ConnectionId(address, null));
+ }
+
+ public Connection(ConnectionId remoteId) throws IOException {
+ if (remoteId.getAddress().isUnresolved()) {
+ throw new UnknownHostException("unknown host: " +
+ remoteId.getAddress().getHostName());
+ }
+ this.remoteId = remoteId;
+ UserGroupInformation ticket = remoteId.getTicket();
+ this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
+ remoteId.getAddress().toString() +
+ " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
+ this.setDaemon(true);
+ }
+
+ /** Update lastActivity with the current time. */
+ private void touch() {
+ lastActivity.set(System.currentTimeMillis());
+ }
+
+ /**
+ * Add a call to this connection's call queue and notify
+ * a listener; synchronized.
+ * Returns false if called during shutdown.
+ * @param call to add
+ * @return true if the call was added.
+ */
+ private synchronized boolean addCall(Call call) {
+ if (shouldCloseConnection.get())
+ return false;
+ calls.put(call.id, call);
+ notify();
+ return true;
+ }
+
+ /** This class sends a ping to the remote side when timeout on
+ * reading. If no failure is detected, it retries until at least
+ * a byte is read.
+ */
+ private class PingInputStream extends FilterInputStream {
+ /* constructor */
+ protected PingInputStream(InputStream in) {
+ super(in);
+ }
+
+ /* Process timeout exception
+ * if the connection is not going to be closed, send a ping.
+ * otherwise, throw the timeout exception.
+ */
+ private void handleTimeout(SocketTimeoutException e) throws IOException {
+ if (shouldCloseConnection.get() || !running.get()) {
+ throw e;
+ } else {
+ sendPing();
+ }
+ }
+
+ /** Read a byte from the stream.
+ * Send a ping if timeout on read. Retries if no failure is detected
+ * until a byte is read.
+ * @throws IOException for any IO problem other than socket timeout
+ */
+ public int read() throws IOException {
+ do {
+ try {
+ return super.read();
+ } catch (SocketTimeoutException e) {
+ handleTimeout(e);
+ }
+ } while (true);
+ }
+
+ /** Read bytes into a buffer starting from offset <code>off</code>
+ * Send a ping if timeout on read. Retries if no failure is detected
+ * until a byte is read.
+ *
+ * @return the total number of bytes read; -1 if the connection is closed.
+ */
+ public int read(byte[] buf, int off, int len) throws IOException {
+ do {
+ try {
+ return super.read(buf, off, len);
+ } catch (SocketTimeoutException e) {
+ handleTimeout(e);
+ }
+ } while (true);
+ }
+ }
+
+ /** Connect to the server and set up the I/O streams. It then sends
+ * a header to the server and starts
+ * the connection thread that waits for responses.
+ */
+ private synchronized void setupIOstreams() {
+ if (socket != null || shouldCloseConnection.get()) {
+ return;
+ }
+
+ short ioFailures = 0;
+ short timeoutFailures = 0;
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to "+remoteId.getAddress());
+ }
+ while (true) {
+ try {
+ this.socket = socketFactory.createSocket();
+ this.socket.setTcpNoDelay(tcpNoDelay);
+ // connection time out is 20s
+ this.socket.connect(remoteId.getAddress(), 20000);
+ this.socket.setSoTimeout(pingInterval);
+ break;
+ } catch (SocketTimeoutException toe) {
+ /* The max number of retries is 45,
+ * which amounts to 20s*45 = 15 minutes retries.
+ */
+ handleConnectionFailure(timeoutFailures++, 45, toe);
+ } catch (IOException ie) {
+ handleConnectionFailure(ioFailures++, maxRetries, ie);
+ }
+ }
+ this.in = new DataInputStream(new BufferedInputStream
+ (new PingInputStream(NetUtils.getInputStream(socket))));
+ this.out = new DataOutputStream
+ (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+ writeHeader();
+
+ // update last activity time
+ touch();
+
+ // start the receiver thread after the socket connection has been set up
+ start();
+ } catch (IOException e) {
+ markClosed(e);
+ close();
+ }
+ }
+
+ /* Handle connection failures
+ *
+ * If the current number of retries is equal to the max number of retries,
+ * stop retrying and throw the exception; Otherwise backoff 1 second and
+ * try connecting again.
+ *
+ * This Method is only called from inside setupIOstreams(), which is
+ * synchronized. Hence the sleep is synchronized; the locks will be retained.
+ *
+ * @param curRetries current number of retries
+ * @param maxRetries max number of retries allowed
+ * @param ioe failure reason
+ * @throws IOException if max number of retries is reached
+ */
+ private void handleConnectionFailure(
+ int curRetries, int maxRetries, IOException ioe) throws IOException {
+ // close the current connection
+ try {
+ socket.close();
+ } catch (IOException e) {
+ LOG.warn("Not able to close a socket", e);
+ }
+ // set socket to null so that the next call to setupIOstreams
+ // can start the process of connect all over again.
+ socket = null;
+
+ // throw the exception if the maximum number of retries is reached
+ if (curRetries >= maxRetries) {
+ throw ioe;
+ }
+
+ // otherwise back off and retry
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {}
+
+ LOG.info("Retrying connect to server: " + remoteId.getAddress() +
+ ". Already tried " + curRetries + " time(s).");
+ }
+
+ /* Write the header for each connection
+ * Out is not synchronized because only the first thread does this.
+ */
+ private void writeHeader() throws IOException {
+ out.write(HBaseServer.HEADER.array());
+ out.write(HBaseServer.CURRENT_VERSION);
+ //When there are more fields we can have ConnectionHeader Writable.
+ DataOutputBuffer buf = new DataOutputBuffer();
+ ObjectWritable.writeObject(buf, remoteId.getTicket(),
+ UserGroupInformation.class, conf);
+ int bufLen = buf.getLength();
+ out.writeInt(bufLen);
+ out.write(buf.getData(), 0, bufLen);
+ }
+
+ /* wait till someone signals us to start reading RPC response or
+ * it is idle too long, it is marked as to be closed,
+ * or the client is marked as not running.
+ *
+ * Return true if it is time to read a response; false otherwise.
+ */
+ private synchronized boolean waitForWork() {
+ if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
+ long timeout = maxIdleTime-
+ (System.currentTimeMillis()-lastActivity.get());
+ if (timeout>0) {
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
+ return true;
+ } else if (shouldCloseConnection.get()) {
+ return false;
+ } else if (calls.isEmpty()) { // idle connection closed or stopped
+ markClosed(null);
+ return false;
+ } else { // get stopped but there are still pending requests
+ markClosed((IOException)new IOException().initCause(
+ new InterruptedException()));
+ return false;
+ }
+ }
+
+ public InetSocketAddress getRemoteAddress() {
+ return remoteId.getAddress();
+ }
+
+ /* Send a ping to the server if the time elapsed
+ * since last I/O activity is equal to or greater than the ping interval
+ */
+ private synchronized void sendPing() throws IOException {
+ long curTime = System.currentTimeMillis();
+ if ( curTime - lastActivity.get() >= pingInterval) {
+ lastActivity.set(curTime);
+ synchronized (out) {
+ out.writeInt(PING_CALL_ID);
+ out.flush();
+ }
+ }
+ }
+
+ public void run() {
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + ": starting, having connections "
+ + connections.size());
+
+ while (waitForWork()) {//wait here for work - read or close connection
+ receiveResponse();
+ }
+
+ close();
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + ": stopped, remaining connections "
+ + connections.size());
+ }
+
+ /** Initiates a call by sending the parameter to the remote server.
+ * Note: this is not called from the Connection thread, but by other
+ * threads.
+ */
+ public void sendParam(Call call) {
+ if (shouldCloseConnection.get()) {
+ return;
+ }
+
+ DataOutputBuffer d=null;
+ try {
+ synchronized (this.out) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + " sending #" + call.id);
+
+ //for serializing the
+ //data to be written
+ d = new DataOutputBuffer();
+ d.writeInt(call.id);
+ call.param.write(d);
+ byte[] data = d.getData();
+ int dataLength = d.getLength();
+ out.writeInt(dataLength); //first put the data length
+ out.write(data, 0, dataLength);//write the data
+ out.flush();
+ }
+ } catch(IOException e) {
+ markClosed(e);
+ } finally {
+ //the buffer is just an in-memory buffer, but it is still polite to
+ // close early
+ IOUtils.closeStream(d);
+ }
+ }
+
+ /* Receive a response.
+ * Because only one receiver, so no synchronization on in.
+ */
+ private void receiveResponse() {
+ if (shouldCloseConnection.get()) {
+ return;
+ }
+ touch();
+
+ try {
+ int id = in.readInt(); // try to read an id
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + " got value #" + id);
+
+ Call call = calls.remove(id);
+
+ boolean isError = in.readBoolean(); // read if error
+ if (isError) {
+ call.setException(new RemoteException( WritableUtils.readString(in),
+ WritableUtils.readString(in)));
+ } else {
+ Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+ value.readFields(in); // read value
+ call.setValue(value);
+ }
+ } catch (IOException e) {
+ markClosed(e);
+ }
+ }
+
+ private synchronized void markClosed(IOException e) {
+ if (shouldCloseConnection.compareAndSet(false, true)) {
+ closeException = e;
+ notifyAll();
+ }
+ }
+
+ /** Close the connection. */
+ private synchronized void close() {
+ if (!shouldCloseConnection.get()) {
+ LOG.error("The connection is not in the closed state");
+ return;
+ }
+
+ // release the resources
+ // first thing to do;take the connection out of the connection list
+ synchronized (connections) {
+ if (connections.get(remoteId) == this) {
+ connections.remove(remoteId);
+ }
+ }
+
+ // close the streams and therefore the socket
+ IOUtils.closeStream(out);
+ IOUtils.closeStream(in);
+
+ // clean up all calls
+ if (closeException == null) {
+ if (!calls.isEmpty()) {
+ LOG.warn(
+ "A connection is closed for no cause and calls are not empty");
+
+ // clean up calls anyway
+ closeException = new IOException("Unexpected closed connection");
+ cleanupCalls();
+ }
+ } else {
+ // log the info
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+ closeException.getMessage(),closeException);
+ }
+
+ // cleanup calls
+ cleanupCalls();
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + ": closed");
+ }
+
+ /* Cleanup all calls and mark them as done */
+ private void cleanupCalls() {
+ Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
+ while (itor.hasNext()) {
+ Call c = itor.next().getValue();
+ c.setException(closeException); // local exception
+ itor.remove();
+ }
+ }
+ }
+
+ /** Call implementation used for parallel calls. */
+ private class ParallelCall extends Call {
+ private ParallelResults results;
+ private int index;
+
+ public ParallelCall(Writable param, ParallelResults results, int index) {
+ super(param);
+ this.results = results;
+ this.index = index;
+ }
+
+ /** Deliver result to result collector. */
+ protected void callComplete() {
+ results.callComplete(this);
+ }
+ }
+
+ /** Result collector for parallel calls. */
+ private static class ParallelResults {
+ private Writable[] values;
+ private int size;
+ private int count;
+
+ public ParallelResults(int size) {
+ this.values = new Writable[size];
+ this.size = size;
+ }
+
+ /** Collect a result. */
+ public synchronized void callComplete(ParallelCall call) {
+ values[call.index] = call.value; // store the value
+ count++; // count it
+ if (count == size) // if all values are in
+ notify(); // then notify waiting caller
+ }
+ }
+
+ /** Construct an IPC client whose values are of the given {@link Writable}
+ * class. */
+ public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
+ SocketFactory factory) {
+ this.valueClass = valueClass;
+ this.maxIdleTime =
+ conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
+ this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
+ this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
+ this.pingInterval = getPingInterval(conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The ping interval is" + this.pingInterval + "ms.");
+ }
+ this.conf = conf;
+ this.socketFactory = factory;
+ }
+
+ /**
+ * Construct an IPC client with the default SocketFactory
+ * @param valueClass
+ * @param conf
+ */
+ public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
+ this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
+ }
+
+ /** Return the socket factory of this client
+ *
+ * @return this client's socket factory
+ */
+ SocketFactory getSocketFactory() {
+ return socketFactory;
+ }
+
+ /** Stop all threads related to this client. No further calls may be made
+ * using this client. */
+ public void stop() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping client");
+ }
+
+ if (!running.compareAndSet(true, false)) {
+ return;
+ }
+
+ // wake up all connections
+ synchronized (connections) {
+ for (Connection conn : connections.values()) {
+ conn.interrupt();
+ }
+ }
+
+ // wait until all connections are closed
+ while (!connections.isEmpty()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ /** Make a call, passing <code>param</code>, to the IPC server running at
+ * <code>address</code>, returning the value. Throws exceptions if there are
+ * network problems or if the remote code threw an exception. */
+ public Writable call(Writable param, InetSocketAddress address)
+ throws InterruptedException, IOException {
+ return call(param, address, null);
+ }
+
+ public Writable call(Writable param, InetSocketAddress addr,
+ UserGroupInformation ticket)
+ throws InterruptedException, IOException {
+ Call call = new Call(param);
+ Connection connection = getConnection(addr, ticket, call);
+ connection.sendParam(call); // send the parameter
+ synchronized (call) {
+ while (!call.done) {
+ try {
+ call.wait(); // wait for the result
+ } catch (InterruptedException ignored) {}
+ }
+
+ if (call.error != null) {
+ if (call.error instanceof RemoteException) {
+ call.error.fillInStackTrace();
+ throw call.error;
+ } else { // local exception
+ throw wrapException(addr, call.error);
+ }
+ } else {
+ return call.value;
+ }
+ }
+ }
+
+ /**
+ * Take an IOException and the address we were trying to connect to
+ * and return an IOException with the input exception as the cause.
+ * The new exception provides the stack trace of the place where
+ * the exception is thrown and some extra diagnostics information.
+ * If the exception is ConnectException or SocketTimeoutException,
+ * return a new one of the same type; Otherwise return an IOException.
+ *
+ * @param addr target address
+ * @param exception the relevant exception
+ * @return an exception to throw
+ */
+ private IOException wrapException(InetSocketAddress addr,
+ IOException exception) {
+ if (exception instanceof ConnectException) {
+ //connection refused; include the host:port in the error
+ return (ConnectException)new ConnectException(
+ "Call to " + addr + " failed on connection exception: " + exception)
+ .initCause(exception);
+ } else if (exception instanceof SocketTimeoutException) {
+ return (SocketTimeoutException)new SocketTimeoutException(
+ "Call to " + addr + " failed on socket timeout exception: "
+ + exception).initCause(exception);
+ } else {
+ return (IOException)new IOException(
+ "Call to " + addr + " failed on local exception: " + exception)
+ .initCause(exception);
+
+ }
+ }
+
+ /** Makes a set of calls in parallel. Each parameter is sent to the
+ * corresponding address. When all values are available, or have timed out
+ * or errored, the collected results are returned in an array. The array
+ * contains nulls for calls that timed out or errored. */
+ public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+ throws IOException {
+ if (addresses.length == 0) return new Writable[0];
+
+ ParallelResults results = new ParallelResults(params.length);
+ synchronized (results) {
+ for (int i = 0; i < params.length; i++) {
+ ParallelCall call = new ParallelCall(params[i], results, i);
+ try {
+ Connection connection = getConnection(addresses[i], null, call);
+ connection.sendParam(call); // send each parameter
+ } catch (IOException e) {
+ // log errors
+ LOG.info("Calling "+addresses[i]+" caught: " +
+ e.getMessage(),e);
+ results.size--; // wait for one fewer result
+ }
+ }
+ while (results.count != results.size) {
+ try {
+ results.wait(); // wait for all results
+ } catch (InterruptedException e) {}
+ }
+
+ return results.values;
+ }
+ }
+
+ /** Get a connection from the pool, or create a new one and add it to the
+ * pool. Connections to a given host/port are reused. */
+ private Connection getConnection(InetSocketAddress addr,
+ UserGroupInformation ticket,
+ Call call)
+ throws IOException {
+ if (!running.get()) {
+ // the client is stopped
+ throw new IOException("The client is stopped");
+ }
+ Connection connection;
+ /* we could avoid this allocation for each RPC by having a
+ * connectionsId object and with set() method. We need to manage the
+ * refs for keys in HashMap properly. For now its ok.
+ */
+ ConnectionId remoteId = new ConnectionId(addr, ticket);
+ do {
+ synchronized (connections) {
+ connection = connections.get(remoteId);
+ if (connection == null) {
+ connection = new Connection(remoteId);
+ connections.put(remoteId, connection);
+ }
+ }
+ } while (!connection.addCall(call));
+
+ //we don't invoke the method below inside "synchronized (connections)"
+ //block above. The reason for that is if the server happens to be slow,
+ //it will take longer to establish a connection and that will slow the
+ //entire system down.
+ connection.setupIOstreams();
+ return connection;
+ }
+
+ /**
+ * This class holds the address and the user ticket. The client connections
+ * to servers are uniquely identified by <remoteAddress, ticket>
+ */
+ private static class ConnectionId {
+ InetSocketAddress address;
+ UserGroupInformation ticket;
+
+ ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
+ this.address = address;
+ this.ticket = ticket;
+ }
+
+ InetSocketAddress getAddress() {
+ return address;
+ }
+ UserGroupInformation getTicket() {
+ return ticket;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ConnectionId) {
+ ConnectionId id = (ConnectionId) obj;
+ return address.equals(id.address) && ticket == id.ticket;
+ //Note : ticket is a ref comparision.
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return address.hashCode() ^ System.identityHashCode(ticket);
+ }
+ }
+}
\ No newline at end of file
Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=724238&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Sun Dec 7 19:20:43 2008
@@ -0,0 +1,678 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/** A simple RPC mechanism.
+ *
+ * This is a local hbase copy of the hadoop RPC so we can do things like
+ * address HADOOP-414 for hbase-only and try other hbase-specific
+ * optimizations like using our own version of ObjectWritable. Class has been
+ * renamed to avoid confusing it w/ hadoop versions.
+ * <p>
+ *
+ *
+ * A <i>protocol</i> is a Java interface. All parameters and return types must
+ * be one of:
+ *
+ * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
+ * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
+ * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
+ *
+ * <li>a {@link String}; or</li>
+ *
+ * <li>a {@link Writable}; or</li>
+ *
+ * <li>an array of the above types</li> </ul>
+ *
+ * All methods in the protocol should throw only IOException. No field data of
+ * the protocol instance is transmitted.
+ */
+public class HBaseRPC {
+ // Leave this out in the hadoop ipc package but keep class name. Do this
+ // so that we dont' get the logging of this class's invocations by doing our
+ // blanket enabling DEBUG on the o.a.h.h. package.
+ private static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
+
+ private HBaseRPC() {
+ super();
+ } // no public ctor
+
+
+ /** A method invocation, including the method name and its parameters.*/
+ private static class Invocation implements Writable, Configurable {
+ // Here, for hbase, we maintain two static maps of method names to code and
+ // vice versa.
+ private static final Map<Byte, String> CODE_TO_METHODNAME =
+ new HashMap<Byte, String>();
+ private static final Map<String, Byte> METHODNAME_TO_CODE =
+ new HashMap<String, Byte>();
+ // Special code that means 'not-encoded'.
+ private static final byte NOT_ENCODED = 0;
+ static {
+ byte code = NOT_ENCODED + 1;
+ code = addToMap(VersionedProtocol.class, code);
+ code = addToMap(HMasterInterface.class, code);
+ code = addToMap(HMasterRegionInterface.class, code);
+ code = addToMap(TransactionalRegionInterface.class, code);
+ }
+ // End of hbase modifications.
+
+ private String methodName;
+ @SuppressWarnings("unchecked")
+ private Class[] parameterClasses;
+ private Object[] parameters;
+ private Configuration conf;
+
+ /** default constructor */
+ public Invocation() {
+ super();
+ }
+
+ /**
+ * @param method
+ * @param parameters
+ */
+ public Invocation(Method method, Object[] parameters) {
+ this.methodName = method.getName();
+ this.parameterClasses = method.getParameterTypes();
+ this.parameters = parameters;
+ }
+
+ /** @return The name of the method invoked. */
+ public String getMethodName() { return methodName; }
+
+ /** @return The parameter classes. */
+ @SuppressWarnings("unchecked")
+ public Class[] getParameterClasses() { return parameterClasses; }
+
+ /** @return The parameter instances. */
+ public Object[] getParameters() { return parameters; }
+
+ public void readFields(DataInput in) throws IOException {
+ byte code = in.readByte();
+ methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code));
+ parameters = new Object[in.readInt()];
+ parameterClasses = new Class[parameters.length];
+ HbaseObjectWritable objectWritable = new HbaseObjectWritable();
+ for (int i = 0; i < parameters.length; i++) {
+ parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
+ this.conf);
+ parameterClasses[i] = objectWritable.getDeclaredClass();
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ writeMethodNameCode(out, this.methodName);
+ out.writeInt(parameterClasses.length);
+ for (int i = 0; i < parameterClasses.length; i++) {
+ HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+ conf);
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder(256);
+ buffer.append(methodName);
+ buffer.append("(");
+ for (int i = 0; i < parameters.length; i++) {
+ if (i != 0)
+ buffer.append(", ");
+ buffer.append(parameters[i]);
+ }
+ buffer.append(")");
+ return buffer.toString();
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ // Hbase additions.
+ private static void addToMap(final String name, final byte code) {
+ if (METHODNAME_TO_CODE.containsKey(name)) {
+ return;
+ }
+ METHODNAME_TO_CODE.put(name, Byte.valueOf(code));
+ CODE_TO_METHODNAME.put(Byte.valueOf(code), name);
+ }
+
+ /*
+ * @param c Class whose methods we'll add to the map of methods to codes
+ * (and vice versa).
+ * @param code Current state of the byte code.
+ * @return State of <code>code</code> when this method is done.
+ */
+ private static byte addToMap(final Class<?> c, final byte code) {
+ byte localCode = code;
+ Method [] methods = c.getMethods();
+ // There are no guarantees about the order in which items are returned in
+ // so do a sort (Was seeing that sort was one way on one server and then
+ // another on different server).
+ Arrays.sort(methods, new Comparator<Method>() {
+ public int compare(Method left, Method right) {
+ return left.getName().compareTo(right.getName());
+ }
+ });
+ for (int i = 0; i < methods.length; i++) {
+ addToMap(methods[i].getName(), localCode++);
+ }
+ return localCode;
+ }
+
+ /*
+ * Write out the code byte for passed Class.
+ * @param out
+ * @param c
+ * @throws IOException
+ */
+ static void writeMethodNameCode(final DataOutput out, final String methodname)
+ throws IOException {
+ Byte code = METHODNAME_TO_CODE.get(methodname);
+ if (code == null) {
+ LOG.error("Unsupported type " + methodname);
+ throw new UnsupportedOperationException("No code for unexpected " +
+ methodname);
+ }
+ out.writeByte(code.byteValue());
+ }
+ // End of hbase additions.
+ }
+
+ /* Cache a client using its socket factory as the hash key */
+ static private class ClientCache {
+ private Map<SocketFactory, HBaseClient> clients =
+ new HashMap<SocketFactory, HBaseClient>();
+
+ /**
+ * Construct & cache an IPC client with the user-provided SocketFactory
+ * if no cached client exists.
+ *
+ * @param conf Configuration
+ * @return an IPC client
+ */
+ private synchronized HBaseClient getClient(Configuration conf,
+ SocketFactory factory) {
+ // Construct & cache client. The configuration is only used for timeout,
+ // and Clients have connection pools. So we can either (a) lose some
+ // connection pooling and leak sockets, or (b) use the same timeout for all
+ // configurations. Since the IPC is usually intended globally, not
+ // per-job, we choose (a).
+ HBaseClient client = clients.get(factory);
+ if (client == null) {
+ // Make an hbase client instead of hadoop Client.
+ client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
+ clients.put(factory, client);
+ } else {
+ ((HBaseClient)client).incCount();
+ }
+ return client;
+ }
+
+ /**
+ * Construct & cache an IPC client with the default SocketFactory
+ * if no cached client exists.
+ *
+ * @param conf Configuration
+ * @return an IPC client
+ */
+ private synchronized HBaseClient getClient(Configuration conf) {
+ return getClient(conf, SocketFactory.getDefault());
+ }
+
+ /**
+ * Stop a RPC client connection
+ * A RPC client is closed only when its reference count becomes zero.
+ */
+ private void stopClient(HBaseClient client) {
+ synchronized (this) {
+ ((HBaseClient)client).decCount();
+ if (((HBaseClient)client).isZeroReference()) {
+ clients.remove(((HBaseClient)client).getSocketFactory());
+ }
+ }
+ if (((HBaseClient)client).isZeroReference()) {
+ client.stop();
+ }
+ }
+ }
+
+ private static ClientCache CLIENTS = new ClientCache();
+
+ private static class Invoker implements InvocationHandler {
+ private InetSocketAddress address;
+ private UserGroupInformation ticket;
+ private HBaseClient client;
+ private boolean isClosed = false;
+
+ /**
+ * @param address
+ * @param ticket
+ * @param conf
+ * @param factory
+ */
+ public Invoker(InetSocketAddress address, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory) {
+ this.address = address;
+ this.ticket = ticket;
+ this.client = CLIENTS.getClient(conf, factory);
+ }
+
+ public Object invoke(@SuppressWarnings("unused") Object proxy,
+ Method method, Object[] args)
+ throws Throwable {
+ final boolean logDebug = LOG.isDebugEnabled();
+ long startTime = 0;
+ if (logDebug) {
+ startTime = System.currentTimeMillis();
+ }
+ HbaseObjectWritable value = (HbaseObjectWritable)
+ client.call(new Invocation(method, args), address, ticket);
+ if (logDebug) {
+ long callTime = System.currentTimeMillis() - startTime;
+ LOG.debug("Call: " + method.getName() + " " + callTime);
+ }
+ return value.get();
+ }
+
+ /* close the IPC client that's responsible for this invoker's RPCs */
+ synchronized private void close() {
+ if (!isClosed) {
+ isClosed = true;
+ CLIENTS.stopClient(client);
+ }
+ }
+ }
+
+ /**
+ * A version mismatch for the RPC protocol.
+ */
+ @SuppressWarnings("serial")
+ public static class VersionMismatch extends IOException {
+ private String interfaceName;
+ private long clientVersion;
+ private long serverVersion;
+
+ /**
+ * Create a version mismatch exception
+ * @param interfaceName the name of the protocol mismatch
+ * @param clientVersion the client's version of the protocol
+ * @param serverVersion the server's version of the protocol
+ */
+ public VersionMismatch(String interfaceName, long clientVersion,
+ long serverVersion) {
+ super("Protocol " + interfaceName + " version mismatch. (client = " +
+ clientVersion + ", server = " + serverVersion + ")");
+ this.interfaceName = interfaceName;
+ this.clientVersion = clientVersion;
+ this.serverVersion = serverVersion;
+ }
+
+ /**
+ * Get the interface name
+ * @return the java class name
+ * (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
+ */
+ public String getInterfaceName() {
+ return interfaceName;
+ }
+
+ /**
+ * @return the client's preferred version
+ */
+ public long getClientVersion() {
+ return clientVersion;
+ }
+
+ /**
+ * @return the server's agreed to version.
+ */
+ public long getServerVersion() {
+ return serverVersion;
+ }
+ }
+
+ /**
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @param maxAttempts
+ * @return proxy
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static VersionedProtocol waitForProxy(Class protocol,
+ long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf,
+ int maxAttempts
+ ) throws IOException {
+ // HBase does limited number of reconnects which is different from hadoop.
+ int reconnectAttempts = 0;
+ while (true) {
+ try {
+ return getProxy(protocol, clientVersion, addr, conf);
+ } catch(ConnectException se) { // namenode has not been started
+ LOG.info("Server at " + addr + " not available yet, Zzzzz...");
+ if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
+ LOG.info("Server at " + addr + " could not be reached after " +
+ reconnectAttempts + " tries, giving up.");
+ throw new RetriesExhaustedException(addr.toString(), "unknown".getBytes(),
+ "unknown".getBytes(), reconnectAttempts - 1,
+ new ArrayList<Throwable>());
+ }
+ } catch(SocketTimeoutException te) { // namenode is busy
+ LOG.info("Problem connecting to server: " + addr);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ // IGNORE
+ }
+ }
+ }
+
+ /**
+ * Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @param factory
+ * @return proxy
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(Class<?> protocol,
+ long clientVersion, InetSocketAddress addr, Configuration conf,
+ SocketFactory factory) throws IOException {
+ return getProxy(protocol, clientVersion, addr, null, conf, factory);
+ }
+
+ /**
+ * Construct a client-side proxy object that implements the named protocol,
+ * talking to a server at the named address.
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param ticket
+ * @param conf
+ * @param factory
+ * @return proxy
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(Class<?> protocol,
+ long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
+ Configuration conf, SocketFactory factory)
+ throws IOException {
+ VersionedProtocol proxy =
+ (VersionedProtocol) Proxy.newProxyInstance(
+ protocol.getClassLoader(), new Class[] { protocol },
+ new Invoker(addr, ticket, conf, factory));
+ long serverVersion = proxy.getProtocolVersion(protocol.getName(),
+ clientVersion);
+ if (serverVersion == clientVersion) {
+ return proxy;
+ } else {
+ throw new VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ }
+
+ /**
+ * Construct a client-side proxy object with the default SocketFactory
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @return a proxy instance
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(Class<?> protocol,
+ long clientVersion, InetSocketAddress addr, Configuration conf)
+ throws IOException {
+
+ return getProxy(protocol, clientVersion, addr, conf, NetUtils
+ .getDefaultSocketFactory(conf));
+ }
+
+ /**
+ * Stop this proxy and release its invoker's resource
+ * @param proxy the proxy to be stopped
+ */
+ public static void stopProxy(VersionedProtocol proxy) {
+ if (proxy!=null) {
+ ((Invoker)Proxy.getInvocationHandler(proxy)).close();
+ }
+ }
+
+ /**
+ * Expert: Make multiple, parallel calls to a set of servers.
+ *
+ * @param method
+ * @param params
+ * @param addrs
+ * @param conf
+ * @return values
+ * @throws IOException
+ */
+ public static Object[] call(Method method, Object[][] params,
+ InetSocketAddress[] addrs, Configuration conf)
+ throws IOException {
+
+ Invocation[] invocations = new Invocation[params.length];
+ for (int i = 0; i < params.length; i++)
+ invocations[i] = new Invocation(method, params[i]);
+ HBaseClient client = CLIENTS.getClient(conf);
+ try {
+ Writable[] wrappedValues = client.call(invocations, addrs);
+
+ if (method.getReturnType() == Void.TYPE) {
+ return null;
+ }
+
+ Object[] values =
+ (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
+ for (int i = 0; i < values.length; i++)
+ if (wrappedValues[i] != null)
+ values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
+
+ return values;
+ } finally {
+ CLIENTS.stopClient(client);
+ }
+ }
+
+ /**
+ * Construct a server for a protocol implementation instance listening on a
+ * port and address.
+ *
+ * @param instance
+ * @param bindAddress
+ * @param port
+ * @param conf
+ * @return Server
+ * @throws IOException
+ */
+ public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf)
+ throws IOException {
+ return getServer(instance, bindAddress, port, 1, false, conf);
+ }
+
+ /**
+ * Construct a server for a protocol implementation instance listening on a
+ * port and address.
+ *
+ * @param instance
+ * @param bindAddress
+ * @param port
+ * @param numHandlers
+ * @param verbose
+ * @param conf
+ * @return Server
+ * @throws IOException
+ */
+ public static Server getServer(final Object instance, final String bindAddress, final int port,
+ final int numHandlers,
+ final boolean verbose, Configuration conf)
+ throws IOException {
+ return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+ }
+
+ /** An RPC Server. */
+ public static class Server extends HBaseServer {
+ private Object instance;
+ private Class<?> implementation;
+ private boolean verbose;
+
+ /**
+ * Construct an RPC server.
+ * @param instance the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @throws IOException
+ */
+ public Server(Object instance, Configuration conf, String bindAddress, int port)
+ throws IOException {
+ this(instance, conf, bindAddress, port, 1, false);
+ }
+
+ private static String classNameBase(String className) {
+ String[] names = className.split("\\.", -1);
+ if (names == null || names.length == 0) {
+ return className;
+ }
+ return names[names.length-1];
+ }
+
+ /** Construct an RPC server.
+ * @param instance the instance whose methods will be called
+ * @param conf the configuration to use
+ * @param bindAddress the address to bind on to listen for connection
+ * @param port the port to listen for connections on
+ * @param numHandlers the number of method handler threads to run
+ * @param verbose whether each call should be logged
+ * @throws IOException
+ */
+ public Server(Object instance, Configuration conf, String bindAddress, int port,
+ int numHandlers, boolean verbose) throws IOException {
+ super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
+ this.instance = instance;
+ this.implementation = instance.getClass();
+ this.verbose = verbose;
+ }
+
+ @Override
+ public Writable call(Writable param, long receivedTime) throws IOException {
+ try {
+ Invocation call = (Invocation)param;
+ if (verbose) log("Call: " + call);
+ Method method =
+ implementation.getMethod(call.getMethodName(),
+ call.getParameterClasses());
+
+ long startTime = System.currentTimeMillis();
+ Object value = method.invoke(instance, call.getParameters());
+ int processingTime = (int) (System.currentTimeMillis() - startTime);
+ int qTime = (int) (startTime-receivedTime);
+ LOG.debug("Served: " + call.getMethodName() +
+ " queueTime= " + qTime +
+ " procesingTime= " + processingTime);
+ rpcMetrics.rpcQueueTime.inc(qTime);
+ rpcMetrics.rpcProcessingTime.inc(processingTime);
+
+ MetricsTimeVaryingRate m = rpcMetrics.metricsList.get(call.getMethodName());
+
+ if (m != null) {
+ m.inc(processingTime);
+ }
+ else {
+ rpcMetrics.metricsList.put(call.getMethodName(), new MetricsTimeVaryingRate(call.getMethodName()));
+ m = rpcMetrics.metricsList.get(call.getMethodName());
+ m.inc(processingTime);
+ }
+
+ if (verbose) log("Return: "+value);
+
+ return new HbaseObjectWritable(method.getReturnType(), value);
+
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof IOException) {
+ throw (IOException)target;
+ } else {
+ IOException ioe = new IOException(target.toString());
+ ioe.setStackTrace(target.getStackTrace());
+ throw ioe;
+ }
+ } catch (Throwable e) {
+ IOException ioe = new IOException(e.toString());
+ ioe.setStackTrace(e.getStackTrace());
+ throw ioe;
+ }
+ }
+ }
+
+ private static void log(String value) {
+ if (value!= null && value.length() > 55)
+ value = value.substring(0, 55)+"...";
+ LOG.info(value);
+ }
+}
Modified: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java?rev=724238&r1=724237&r2=724238&view=diff
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java (original)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java Sun Dec 7 19:20:43 2008
@@ -63,7 +63,8 @@
* <li>Version 11: Changed getClosestRowBefore signature.</li>
* <li>Version 12: HServerLoad extensions (HBASE-1018).</li>
* <li>Version 13: HBASE-847</li>
+ * <li>Version 14: HBASE-900</li>
* </ul>
*/
- public static final long versionID = 13L;
+ public static final long versionID = 14L;
}
Added: hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java?rev=724238&view=auto
==============================================================================
--- hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java (added)
+++ hadoop/hbase/branches/0.19_on_hadoop_0.18/src/java/org/apache/hadoop/hbase/ipc/HBaseRpcMetrics.java Sun Dec 7 19:20:43 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
+
+/**
+ *
+ * This class is for maintaining the various RPC statistics
+ * and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values;
+ * for example:
+ * <p> {@link #rpcQueueTime}.inc(time)
+ *
+ */
+public class HBaseRpcMetrics implements Updater {
+ private MetricsRecord metricsRecord;
+ private static Log LOG = LogFactory.getLog(HBaseRpcMetrics.class);
+
+ public HBaseRpcMetrics(String hostName, String port, HBaseServer server) {
+ MetricsContext context = MetricsUtil.getContext("rpc");
+ metricsRecord = MetricsUtil.createRecord(context, "metrics");
+
+ metricsRecord.setTag("port", port);
+
+ LOG.info("Initializing RPC Metrics with hostName="
+ + hostName + ", port=" + port);
+
+ context.registerUpdater(this);
+ }
+
+
+ /**
+ * The metrics variables are public:
+ * - they can be set directly by calling their set/inc methods
+ * -they can also be read directly - e.g. JMX does this.
+ */
+
+ public MetricsTimeVaryingRate rpcQueueTime = new MetricsTimeVaryingRate("RpcQueueTime");
+ public MetricsTimeVaryingRate rpcProcessingTime = new MetricsTimeVaryingRate("RpcProcessingTime");
+
+ public Map <String, MetricsTimeVaryingRate> metricsList = Collections.synchronizedMap(new HashMap<String, MetricsTimeVaryingRate>());
+
+
+
+ /**
+ * Push the metrics to the monitoring subsystem on doUpdate() call.
+ */
+ public void doUpdates(MetricsContext context) {
+ rpcQueueTime.pushMetric(metricsRecord);
+ rpcProcessingTime.pushMetric(metricsRecord);
+
+ synchronized (metricsList) {
+ // Iterate through the rpcMetrics hashmap to propogate the different rpc metrics.
+ Set keys = metricsList.keySet();
+
+ Iterator keyIter = keys.iterator();
+
+ while (keyIter.hasNext()) {
+ Object key = keyIter.next();
+ MetricsTimeVaryingRate value = metricsList.get(key);
+
+ value.pushMetric(metricsRecord);
+ }
+ }
+ metricsRecord.update();
+ }
+
+ public void shutdown() {
+ // Nothing to do
+ }
+}
\ No newline at end of file