You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/10/09 00:49:50 UTC
svn commit: r703013 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/
src/test/org/apache/hadoop/hbase/
Author: stack
Date: Wed Oct 8 15:49:50 2008
New Revision: 703013
URL: http://svn.apache.org/viewvc?rev=703013&view=rev
Log:
HBASE-576 Investigate IPC performance
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Wed Oct 8 15:49:50 2008
@@ -26,6 +26,7 @@
(DoÄacan Güney via Stack)
HBASE-908 Add approximate counting to CountingBloomFilter
(Andrzej Bialecki via Stack)
+ HBASE-576 Investigate IPC performance
NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed Oct 8 15:49:50 2008
@@ -126,6 +126,11 @@
addToMap(RowResult.class, code++);
addToMap(HRegionInfo[].class, code++);
addToMap(MapWritable.class, code++);
+ try {
+ addToMap(Class.forName("[Lorg.apache.hadoop.hbase.io.RowResult;"), code++);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
}
private Class<?> declaredClass;
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Wed Oct 8 15:49:50 2008
@@ -38,8 +38,9 @@
* -- HADOOP-2495 and then to 3 when we changed the RPC to send codes instead
* of actual class names (HADOOP-2519).
* <p>Version 4 when we moved to all byte arrays (HBASE-42).
+ * <p>Version 5 HBASE-576.
*/
- public static final long versionID = 4L;
+ public static final long versionID = 5L;
/** @return true if master is available */
public boolean isMasterRunning();
@@ -126,4 +127,4 @@
* @return address of server that serves the root region
*/
public HServerAddress findRootRegion();
-}
\ No newline at end of file
+}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java Wed Oct 8 15:49:50 2008
@@ -30,6 +30,7 @@
/**
* HRegionServers interact with the HMasterRegionInterface to report on local
* goings-on and to obtain data-handling instructions from the HMaster.
+ * <p>Changes here need to be reflected in HbaseObjectWritable HbaseRPC#Invoker.
*/
public interface HMasterRegionInterface extends VersionedProtocol {
/**
@@ -38,8 +39,9 @@
* MapWritable instead of a HbaseMapWritable as part of HBASE-82 changes.
* Version 3 was when HMsg was refactored so it could carry optional
* messages (HBASE-504).
+ * <p>HBASE-576 we moved this to 4.
*/
- public static final long versionID = 3L;
+ public static final long versionID = 4L;
/**
* Called when a region server first starts
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Oct 8 15:49:50 2008
@@ -37,8 +37,9 @@
/**
* Protocol version.
* Upped to 5 when we added scanner caching
+ * <p>HBASE-576, we moved this to 6.
*/
- public static final long versionID = 5L;
+ public static final long versionID = 6L;
/**
* Get metainfo about an HRegion
@@ -220,4 +221,4 @@
*/
public void unlockRow(final byte [] regionName, final long lockId)
throws IOException;
-}
\ No newline at end of file
+}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java Wed Oct 8 15:49:50 2008
@@ -30,6 +30,8 @@
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
@@ -40,8 +42,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.HBaseClient;
@@ -82,11 +83,28 @@
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
- private HbaseRPC() {} // no public ctor
+ private HbaseRPC() {
+ super();
+ } // no public ctor
/** A method invocation, including the method name and its parameters.*/
private static class Invocation implements Writable, Configurable {
+ // Here we maintain two static maps of method names to code and vice versa.
+ private static final Map<Byte, String> CODE_TO_METHODNAME =
+ new HashMap<Byte, String>();
+ private static final Map<String, Byte> METHODNAME_TO_CODE =
+ new HashMap<String, Byte>();
+ // Special code that means 'not-encoded'.
+ private static final byte NOT_ENCODED = 0;
+ static {
+ byte code = NOT_ENCODED + 1;
+ code = addToMap(VersionedProtocol.class, code);
+ code = addToMap(HMasterInterface.class, code);
+ code = addToMap(HMasterRegionInterface.class, code);
+ code = addToMap(TransactionalRegionInterface.class, code);
+ }
+
private String methodName;
@SuppressWarnings("unchecked")
private Class[] parameterClasses;
@@ -94,7 +112,9 @@
private Configuration conf;
/** default constructor */
- public Invocation() {}
+ public Invocation() {
+ super();
+ }
/**
* @param method
@@ -117,21 +137,23 @@
public Object[] getParameters() { return parameters; }
public void readFields(DataInput in) throws IOException {
- methodName = Text.readString(in);
+ byte code = in.readByte();
+ methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code));
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
- ObjectWritable objectWritable = new ObjectWritable();
+ HbaseObjectWritable objectWritable = new HbaseObjectWritable();
for (int i = 0; i < parameters.length; i++) {
- parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+ parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
+ this.conf);
parameterClasses[i] = objectWritable.getDeclaredClass();
}
}
public void write(DataOutput out) throws IOException {
- Text.writeString(out, methodName);
+ writeMethodNameCode(out, this.methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
- ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+ HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf);
}
}
@@ -157,7 +179,54 @@
public Configuration getConf() {
return this.conf;
}
+
+ private static void addToMap(final String name, final byte code) {
+ if (METHODNAME_TO_CODE.containsKey(name)) {
+ return;
+ }
+ METHODNAME_TO_CODE.put(name, Byte.valueOf(code));
+ CODE_TO_METHODNAME.put(Byte.valueOf(code), name);
+ }
+
+ /*
+ * @param c Class whose methods we'll add to the map of methods to codes
+ * (and vice versa).
+ * @param code Current state of the byte code.
+ * @return State of <code>code</code> when this method is done.
+ */
+ private static byte addToMap(final Class<?> c, final byte code) {
+ byte localCode = code;
+ Method [] methods = c.getMethods();
+ // There are no guarantees about the order in which items are returned in
+ // so do a sort (Was seeing that sort was one way on one server and then
+ // another on different server).
+ Arrays.sort(methods, new Comparator<Method>() {
+ public int compare(Method left, Method right) {
+ return left.getName().compareTo(right.getName());
+ }
+ });
+ for (int i = 0; i < methods.length; i++) {
+ addToMap(methods[i].getName(), localCode++);
+ }
+ return localCode;
+ }
+ /*
+ * Write out the code byte for passed Class.
+ * @param out
+ * @param c
+ * @throws IOException
+ */
+ static void writeMethodNameCode(final DataOutput out, final String methodname)
+ throws IOException {
+ Byte code = METHODNAME_TO_CODE.get(methodname);
+ if (code == null) {
+ LOG.error("Unsupported type " + methodname);
+ throw new UnsupportedOperationException("No code for unexpected " +
+ methodname);
+ }
+ out.writeByte(code.byteValue());
+ }
}
/* Cache a client using its socket factory as the hash key */
@@ -181,7 +250,7 @@
// per-job, we choose (a).
Client client = clients.get(factory);
if (client == null) {
- client = new HBaseClient(ObjectWritable.class, conf, factory);
+ client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
clients.put(factory, client);
} else {
((HBaseClient)client).incCount();
@@ -217,7 +286,7 @@
}
}
- private static ClientCache CLIENTS=new ClientCache();
+ private static ClientCache CLIENTS = new ClientCache();
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
@@ -242,7 +311,7 @@
Method method, Object[] args)
throws Throwable {
long startTime = System.currentTimeMillis();
- ObjectWritable value = (ObjectWritable)
+ HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address, ticket);
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -379,8 +448,8 @@
*/
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
- Configuration conf, SocketFactory factory) throws IOException {
-
+ Configuration conf, SocketFactory factory)
+ throws IOException {
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
@@ -452,7 +521,7 @@
(Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
for (int i = 0; i < values.length; i++)
if (wrappedValues[i] != null)
- values[i] = ((ObjectWritable)wrappedValues[i]).get();
+ values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
return values;
} finally {
@@ -545,7 +614,6 @@
try {
Invocation call = (Invocation)param;
if (verbose) log("Call: " + call);
-
Method method =
implementation.getMethod(call.getMethodName(),
call.getParameterClasses());
@@ -573,7 +641,7 @@
if (verbose) log("Return: "+value);
- return new ObjectWritable(method.getReturnType(), value);
+ return new HbaseObjectWritable(method.getReturnType(), value);
} catch (InvocationTargetException e) {
Throwable target = e.getTargetException();
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java Wed Oct 8 15:49:50 2008
@@ -29,8 +29,10 @@
*
*/
public interface TransactionalRegionInterface extends HRegionInterface {
- /** Interface version number */
- public static final long versionID = 1L;
+ /** Interface version number
+ * Moved to 2 for hbase-576.
+ */
+ public static final long versionID = 2L;
/**
* Sent to initiate a transaction.
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java Wed Oct 8 15:49:50 2008
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@@ -51,7 +52,8 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.log4j.Logger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
@@ -71,12 +73,12 @@
* runs an individual client. Each client does about 1GB of data.
*/
public class PerformanceEvaluation implements HConstants {
- static final Logger LOG =
- Logger.getLogger(PerformanceEvaluation.class.getName());
+ private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
private static final int ROW_LENGTH = 1000;
private static final int ONE_GB = 1024 * 1024 * 1000;
private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
+
static final byte [] COLUMN_NAME = Bytes.toBytes(COLUMN_FAMILY_STR + "data");
protected static HTableDescriptor tableDescriptor;
@@ -102,6 +104,7 @@
volatile HBaseConfiguration conf;
private boolean miniCluster = false;
+ private boolean nomapred = false;
private int N = 1;
private int R = ROWS_PER_GB;
private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
@@ -223,10 +226,68 @@
private void runNIsMoreThanOne(final String cmd)
throws IOException {
checkTable(new HBaseAdmin(conf));
-
- // Run a mapreduce job. Run as many maps as asked-for clients.
- // Before we start up the job, write out an input file with instruction
- // per client regards which row they are to start on.
+ if (this.nomapred) {
+ doMultipleClients(cmd);
+ } else {
+ doMapReduce(cmd);
+ }
+ }
+
+ /*
+ * Run all clients in this vm each to its own thread.
+ * @param cmd Command to run.
+ * @throws IOException
+ */
+ @SuppressWarnings("unused")
+ private void doMultipleClients(final String cmd) throws IOException {
+ final List<Thread> threads = new ArrayList<Thread>(this.N);
+ final int perClientRows = R/N;
+ for (int i = 0; i < this.N; i++) {
+ Thread t = new Thread (Integer.toString(i)) {
+ @Override
+ public void run() {
+ super.run();
+ PerformanceEvaluation pe = new PerformanceEvaluation(conf);
+ int index = Integer.parseInt(getName());
+ try {
+ long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
+ perClientRows, perClientRows,
+ new Status() {
+ public void setStatus(final String msg) throws IOException {
+ LOG.info("client-" + getName() + " " + msg);
+ }
+ });
+ LOG.info("Finished " + getName() + " in " + elapsedTime +
+ "ms writing " + perClientRows + " rows");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ threads.add(t);
+ }
+ for (Thread t: threads) {
+ t.start();
+ }
+ for (Thread t: threads) {
+ while(t.isAlive()) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted, continuing" + e.toString());
+ }
+ }
+ }
+ }
+
+ /*
+ * Run a mapreduce job. Run as many maps as asked-for clients.
+ * Before we start up the job, write out an input file with instruction
+ * per client regards which row they are to start on.
+ * @param cmd Command to run.
+ * @throws IOException
+ */
+ private void doMapReduce(final String cmd) throws IOException {
Path inputDir = writeInputFile(this.conf);
this.conf.set(EvaluationMapTask.CMD_KEY, cmd);
JobConf job = new JobConf(this.conf, this.getClass());
@@ -274,7 +335,7 @@
}
return subdir;
}
-
+
/*
* A test.
* Subclass to particularize what happens per row.
@@ -559,7 +620,7 @@
if (cmd.equals(RANDOM_READ_MEM)) {
// For this one test, so all fits in memory, make R smaller (See
// pg. 9 of BigTable paper).
- R = (ONE_GB / 10) * N;
+ R = (this.R / 10) * N;
}
MiniHBaseCluster hbaseMiniCluster = null;
@@ -574,7 +635,6 @@
conf.set(HConstants.HBASE_DIR, parentdir.toString());
fs.mkdirs(parentdir);
FSUtils.setVersion(fs, parentdir);
-
hbaseMiniCluster = new MiniHBaseCluster(this.conf, N);
}
@@ -604,13 +664,17 @@
System.err.println(message);
}
System.err.println("Usage: java " + this.getClass().getName() +
- "[--master=host:port] [--miniCluster] <command> <nclients>");
+ " [--master=HOST:PORT] \\");
+ System.err.println(" [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients>");
System.err.println();
System.err.println("Options:");
System.err.println(" master Specify host and port of HBase " +
"cluster master. If not present,");
System.err.println(" address is read from configuration");
System.err.println(" miniCluster Run the test on an HBaseMiniCluster");
+ System.err.println(" nomapred Run multiple clients using threads " +
+ "(rather than use mapreduce)");
+ System.err.println(" rows Rows each client runs. Default: One million");
System.err.println();
System.err.println("Command:");
System.err.println(" randomRead Run random read test");
@@ -643,7 +707,7 @@
}
// Set total number of rows to write.
- R = ROWS_PER_GB * N;
+ this.R = this.R * N;
}
private int doCommandLine(final String[] args) {
@@ -675,6 +739,18 @@
this.miniCluster = true;
continue;
}
+
+ final String nmr = "--nomapred";
+ if (cmd.startsWith(nmr)) {
+ this.nomapred = true;
+ continue;
+ }
+
+ final String rows = "--rows=";
+ if (cmd.startsWith(rows)) {
+ this.R = Integer.parseInt(cmd.substring(rows.length()));
+ continue;
+ }
if (COMMANDS.contains(cmd)) {
getArgs(i + 1, args);
@@ -699,6 +775,6 @@
public static void main(final String[] args) {
HBaseConfiguration c = new HBaseConfiguration();
System.exit(new PerformanceEvaluation(c).
- doCommandLine(args));
+ doCommandLine(args));
}
}
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java Wed Oct 8 15:49:50 2008
@@ -59,7 +59,7 @@
*/
public void testHStoreKeyBorderCases() {
HRegionInfo info = new HRegionInfo(new HTableDescriptor("testtable"),
- HConstants.EMPTY_BYTE_ARRAY,HConstants.EMPTY_BYTE_ARRAY);
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
HStoreKey rowA = new HStoreKey("testtable,www.hbase.org/,1234",
"", Long.MAX_VALUE, info);
HStoreKey rowB = new HStoreKey("testtable,www.hbase.org/%20,99999",