You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/03/20 20:39:51 UTC
svn commit: r1459015 [7/8] - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-client/src/test/...
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Wed Mar 20 19:39:50 2013
@@ -19,16 +19,19 @@
package org.apache.hadoop.hbase.ipc;
-import com.google.common.base.Function;
-import com.google.protobuf.Message;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.Pair;
-import java.io.IOException;
-import java.net.InetSocketAddress;
+import com.google.common.base.Function;
+import com.google.protobuf.Message;
@InterfaceAudience.Private
public interface RpcServer {
@@ -47,19 +50,19 @@ public interface RpcServer {
InetSocketAddress getListenerAddress();
/** Called for each call.
+ * @param method Method to invoke.
* @param param parameter
* @param receiveTime time
- * @return Message Protobuf response Message
+ * @param status
+ * @return Message Protobuf response Message and optionally the Cells that make up the response.
* @throws java.io.IOException e
*/
- Message call(Class<? extends IpcProtocol> protocol,
- RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
- throws IOException;
+ Pair<Message, CellScanner> call(Class<? extends IpcProtocol> protocol, Method method,
+ Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
+ throws IOException;
void setErrorHandler(HBaseRPCErrorHandler handler);
- void setQosFunction(Function<RpcRequestBody, Integer> newFunc);
-
void openServer();
void startThreads();
@@ -68,4 +71,6 @@ public interface RpcServer {
* Returns the metrics instance for reporting RPC call statistics
*/
MetricsHBaseServer getMetrics();
+
+ public void setQosFunction(Function<Pair<RequestHeader, Message>, Integer> newFunc);
}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java Wed Mar 20 19:39:50 2013
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
@@ -45,43 +45,41 @@ public class KeyValueSerialization imple
}
public static class KeyValueDeserializer implements Deserializer<KeyValue> {
- private InputStream is;
+ private DataInputStream dis;
@Override
public void close() throws IOException {
- this.is.close();
+ this.dis.close();
}
@Override
public KeyValue deserialize(KeyValue ignore) throws IOException {
// I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO
- HBaseProtos.KeyValue proto =
- HBaseProtos.KeyValue.parseDelimitedFrom(this.is);
- return ProtobufUtil.toKeyValue(proto);
+ return KeyValue.create(this.dis);
}
@Override
public void open(InputStream is) throws IOException {
- this.is = is;
+ this.dis = new DataInputStream(is);
}
}
public static class KeyValueSerializer implements Serializer<KeyValue> {
- private OutputStream os;
+ private DataOutputStream dos;
@Override
public void close() throws IOException {
- this.os.close();
+ this.dos.close();
}
@Override
public void open(OutputStream os) throws IOException {
- this.os = os;
+ this.dos = new DataOutputStream(os);
}
@Override
public void serialize(KeyValue kv) throws IOException {
- ProtobufUtil.toKeyValue(kv).writeDelimitedTo(this.os);
+ KeyValue.write(kv, this.dos);
}
}
}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java Wed Mar 20 19:39:50 2013
@@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.client.De
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
@@ -57,7 +57,7 @@ public class MutationSerialization imple
@Override
public Mutation deserialize(Mutation mutation) throws IOException {
- Mutate proto = Mutate.parseDelimitedFrom(in);
+ MutationProto proto = MutationProto.parseDelimitedFrom(in);
return ProtobufUtil.toMutation(proto);
}
@@ -82,15 +82,15 @@ public class MutationSerialization imple
@Override
public void serialize(Mutation mutation) throws IOException {
- MutateType type;
+ MutationType type;
if (mutation instanceof Put) {
- type = MutateType.PUT;
+ type = MutationType.PUT;
} else if (mutation instanceof Delete) {
- type = MutateType.DELETE;
+ type = MutationType.DELETE;
} else {
throw new IllegalArgumentException("Only Put and Delete are supported");
}
- ProtobufUtil.toMutate(type, mutation).writeDelimitedTo(out);
+ ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
}
}
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandler.java Wed Mar 20 19:39:50 2013
@@ -19,7 +19,8 @@
package org.apache.hadoop.hbase.monitoring;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+
+import com.google.protobuf.Message;
/**
* A MonitoredTask implementation optimized for use with RPC Handlers
@@ -37,8 +38,7 @@ public interface MonitoredRPCHandler ext
public abstract boolean isRPCRunning();
public abstract boolean isOperationRunning();
- public abstract void setRPC(String methodName, Object [] params,
- long queueTime);
- public abstract void setRPCPacket(RpcRequestBody param);
+ public abstract void setRPC(String methodName, Object [] params, long queueTime);
+ public abstract void setRPCPacket(Message param);
public abstract void setConnection(String clientAddress, int remotePort);
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java Wed Mar 20 19:39:50 2013
@@ -18,19 +18,15 @@
*/
package org.apache.hadoop.hbase.monitoring;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Operation;
-import org.apache.hadoop.hbase.io.WritableWithSize;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Writable;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.protobuf.Message;
/**
* A MonitoredTask implementation designed for use with RPC Handlers
@@ -46,7 +42,7 @@ public class MonitoredRPCHandlerImpl ext
private long rpcStartTime;
private String methodName = "";
private Object [] params = {};
- private RpcRequestBody packet;
+ private Message packet;
public MonitoredRPCHandlerImpl() {
super();
@@ -201,7 +197,7 @@ public class MonitoredRPCHandlerImpl ext
* that it can later compute its size if asked for it.
* @param param The protobuf received by the RPC for this call
*/
- public void setRPCPacket(RpcRequestBody param) {
+ public void setRPCPacket(Message param) {
this.packet = param;
}
@@ -257,4 +253,4 @@ public class MonitoredRPCHandlerImpl ext
}
return super.toString() + ", rpcMethod=" + getRPC();
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java Wed Mar 20 19:39:50 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -205,8 +206,9 @@ public class ZKProcedureMemberRpcs imple
byte[] data = ZKUtil.getData(zkController.getWatcher(), path);
LOG.debug("start proc data length is " + data.length);
if (!ProtobufUtil.isPBMagicPrefix(data)) {
- String msg = "Data in for starting procuedure " + opName + " is illegally formatted. "
- + "Killing the procedure.";
+ String msg = "Data in for starting procuedure " + opName +
+ " is illegally formatted (no pb magic). " +
+ "Killing the procedure: " + Bytes.toString(data);
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Mar 20 19:39:50 2013
@@ -25,7 +25,6 @@ import java.lang.annotation.RetentionPol
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -58,14 +57,17 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.exceptions.ClockOutOfSyncException;
-import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.LeaseException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
@@ -107,6 +109,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcClientEngine;
import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -156,10 +159,10 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
@@ -168,7 +171,6 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
@@ -212,11 +214,11 @@ import org.apache.hadoop.util.StringUtil
import org.apache.zookeeper.KeeperException;
import org.cliffc.high_scale_lib.Counter;
-import com.google.common.base.Function;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -458,8 +460,7 @@ public class HRegionServer implements Cl
// Config'ed params
this.numRetries = conf.getInt("hbase.client.retries.number", 10);
- this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
- 10 * 1000);
+ this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
@@ -507,7 +508,7 @@ public class HRegionServer implements Cl
this.isa = this.rpcServer.getListenerAddress();
this.rpcServer.setErrorHandler(this);
- this.rpcServer.setQosFunction((qosFunction = new QosFunction()));
+ this.rpcServer.setQosFunction((qosFunction = new QosFunction(this)));
this.startcode = System.currentTimeMillis();
// login the zookeeper client principal (if using security)
@@ -567,152 +568,6 @@ public class HRegionServer implements Cl
}
/**
- * Utility used ensuring higher quality of service for priority rpcs; e.g.
- * rpcs to .META., etc.
- */
- class QosFunction implements Function<RpcRequestBody,Integer> {
- private final Map<String, Integer> annotatedQos;
- //We need to mock the regionserver instance for some unit tests (set via
- //setRegionServer method.
- //The field value is initially set to the enclosing instance of HRegionServer.
- private HRegionServer hRegionServer = HRegionServer.this;
-
- //The logic for figuring out high priority RPCs is as follows:
- //1. if the method is annotated with a QosPriority of QOS_HIGH,
- // that is honored
- //2. parse out the protobuf message and see if the request is for meta
- // region, and if so, treat it as a high priority RPC
- //Some optimizations for (2) are done here -
- //Clients send the argument classname as part of making the RPC. The server
- //decides whether to deserialize the proto argument message based on the
- //pre-established set of argument classes (knownArgumentClasses below).
- //This prevents the server from having to deserialize all proto argument
- //messages prematurely.
- //All the argument classes declare a 'getRegion' method that returns a
- //RegionSpecifier object. Methods can be invoked on the returned object
- //to figure out whether it is a meta region or not.
- @SuppressWarnings("unchecked")
- private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
- GetRegionInfoRequest.class,
- GetStoreFileRequest.class,
- CloseRegionRequest.class,
- FlushRegionRequest.class,
- SplitRegionRequest.class,
- CompactRegionRequest.class,
- GetRequest.class,
- MutateRequest.class,
- ScanRequest.class,
- MultiRequest.class
- };
-
- //Some caches for helping performance
- private final Map<String, Class<? extends Message>> argumentToClassMap =
- new HashMap<String, Class<? extends Message>>();
- private final Map<String, Map<Class<? extends Message>, Method>>
- methodMap = new HashMap<String, Map<Class<? extends Message>, Method>>();
-
- public QosFunction() {
- Map<String, Integer> qosMap = new HashMap<String, Integer>();
- for (Method m : HRegionServer.class.getMethods()) {
- QosPriority p = m.getAnnotation(QosPriority.class);
- if (p != null) {
- qosMap.put(m.getName(), p.priority());
- }
- }
-
- annotatedQos = qosMap;
- if (methodMap.get("parseFrom") == null) {
- methodMap.put("parseFrom",
- new HashMap<Class<? extends Message>, Method>());
- }
- if (methodMap.get("getRegion") == null) {
- methodMap.put("getRegion",
- new HashMap<Class<? extends Message>, Method>());
- }
- for (Class<? extends Message> cls : knownArgumentClasses) {
- argumentToClassMap.put(cls.getCanonicalName(), cls);
- try {
- methodMap.get("parseFrom").put(cls,
- cls.getDeclaredMethod("parseFrom",ByteString.class));
- methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- void setRegionServer(HRegionServer server) {
- this.hRegionServer = server;
- }
-
- public boolean isMetaRegion(byte[] regionName) {
- HRegion region;
- try {
- region = hRegionServer.getRegion(regionName);
- } catch (NotServingRegionException ignored) {
- return false;
- }
- return region.getRegionInfo().isMetaRegion();
- }
-
- @Override
- public Integer apply(RpcRequestBody from) {
- String methodName = from.getMethodName();
- Class<? extends Message> rpcArgClass = null;
- if (from.hasRequestClassName()) {
- String cls = from.getRequestClassName();
- rpcArgClass = argumentToClassMap.get(cls);
- }
-
- Integer priorityByAnnotation = annotatedQos.get(methodName);
- if (priorityByAnnotation != null) {
- return priorityByAnnotation;
- }
-
- if (rpcArgClass == null || from.getRequest().isEmpty()) {
- return HConstants.NORMAL_QOS;
- }
- Object deserializedRequestObj;
- //check whether the request has reference to Meta region
- try {
- Method parseFrom = methodMap.get("parseFrom").get(rpcArgClass);
- deserializedRequestObj = parseFrom.invoke(null, from.getRequest());
- Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
- RegionSpecifier regionSpecifier =
- (RegionSpecifier)getRegion.invoke(deserializedRequestObj,
- (Object[])null);
- HRegion region = hRegionServer.getRegion(regionSpecifier);
- if (region.getRegionInfo().isMetaTable()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("High priority: " + from.toString());
- }
- return HConstants.HIGH_QOS;
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
-
- if (methodName.equals("scan")) { // scanner methods...
- ScanRequest request = (ScanRequest)deserializedRequestObj;
- if (!request.hasScannerId()) {
- return HConstants.NORMAL_QOS;
- }
- RegionScanner scanner = hRegionServer.getScanner(request.getScannerId());
- if (scanner != null && scanner.getRegionInfo().isMetaTable()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("High priority scanner request: " + request.getScannerId());
- }
- return HConstants.HIGH_QOS;
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Low priority: " + from.toString());
- }
- return HConstants.NORMAL_QOS;
- }
- }
-
- /**
* All initialization needed before we go register with Master.
*
* @throws IOException
@@ -1448,8 +1303,8 @@ public class HRegionServer implements Cl
Path logdir = new Path(rootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
- this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(),
- rootDir, logName, this.conf, getMetaWALActionListeners(),
+ this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(),
+ rootDir, logName, this.conf, getMetaWALActionListeners(),
this.serverNameFromMasterPOV.toString());
}
return this.hlogForMeta;
@@ -1551,7 +1406,7 @@ public class HRegionServer implements Cl
".compactionChecker", uncaughtExceptionHandler);
if (this.healthCheckChore != null) {
Threads
- .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
+ .setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker",
uncaughtExceptionHandler);
}
@@ -1645,17 +1500,17 @@ public class HRegionServer implements Cl
return getWAL(null);
} catch (IOException e) {
LOG.warn("getWAL threw exception " + e);
- return null;
+ return null;
}
}
@Override
public HLog getWAL(HRegionInfo regionInfo) throws IOException {
//TODO: at some point this should delegate to the HLogFactory
- //currently, we don't care about the region as much as we care about the
+ //currently, we don't care about the region as much as we care about the
//table.. (hence checking the tablename below)
- //_ROOT_ and .META. regions have separate WAL.
- if (regionInfo != null &&
+ //_ROOT_ and .META. regions have separate WAL.
+ if (regionInfo != null &&
regionInfo.isMetaTable()) {
return getMetaWAL();
}
@@ -1749,15 +1604,15 @@ public class HRegionServer implements Cl
if (cause != null) {
msg += "\nCause:\n" + StringUtils.stringifyException(cause);
}
- if (hbaseMaster != null) {
+ // Report to the master but only if we have already registered with the master.
+ if (hbaseMaster != null && this.serverNameFromMasterPOV != null) {
ReportRSFatalErrorRequest.Builder builder =
ReportRSFatalErrorRequest.newBuilder();
ServerName sn =
ServerName.parseVersionedServerName(this.serverNameFromMasterPOV.getVersionedBytes());
builder.setServer(ProtobufUtil.toServerName(sn));
builder.setErrorMessage(msg);
- hbaseMaster.reportRSFatalError(
- null,builder.build());
+ hbaseMaster.reportRSFatalError(null, builder.build());
}
} catch (Throwable t) {
LOG.warn("Unable to report fatal error to master", t);
@@ -2805,33 +2660,39 @@ public class HRegionServer implements Cl
/**
* Mutate data in a table.
*
- * @param controller the RPC controller
+ * @param rpcc the RPC controller
* @param request the mutate request
* @throws ServiceException
*/
@Override
- public MutateResponse mutate(final RpcController controller,
+ public MutateResponse mutate(final RpcController rpcc,
final MutateRequest request) throws ServiceException {
+ // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
+ // It is also the conduit via which we pass back data.
+ PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
+ CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+ // Clear scanner so we are not holding on to reference across call.
+ controller.setCellScanner(null);
try {
requestCount.increment();
HRegion region = getRegion(request.getRegion());
MutateResponse.Builder builder = MutateResponse.newBuilder();
- Mutate mutate = request.getMutate();
+ MutationProto mutation = request.getMutation();
if (!region.getRegionInfo().isMetaTable()) {
cacheFlusher.reclaimMemStoreMemory();
}
Result r = null;
Boolean processed = null;
- MutateType type = mutate.getMutateType();
+ MutationType type = mutation.getMutateType();
switch (type) {
case APPEND:
- r = append(region, mutate);
+ r = append(region, mutation, cellScanner);
break;
case INCREMENT:
- r = increment(region, mutate);
+ r = increment(region, mutation, cellScanner);
break;
case PUT:
- Put put = ProtobufUtil.toPut(mutate);
+ Put put = ProtobufUtil.toPut(mutation, cellScanner);
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
@@ -2859,7 +2720,7 @@ public class HRegionServer implements Cl
}
break;
case DELETE:
- Delete delete = ProtobufUtil.toDelete(mutate);
+ Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
if (request.hasCondition()) {
Condition condition = request.getCondition();
byte[] row = condition.getRow().toByteArray();
@@ -2890,10 +2751,15 @@ public class HRegionServer implements Cl
throw new DoNotRetryIOException(
"Unsupported mutate type: " + type.name());
}
+ CellScannable cellsToReturn = null;
if (processed != null) {
builder.setProcessed(processed.booleanValue());
} else if (r != null) {
- builder.setResult(ProtobufUtil.toResult(r));
+ builder.setResult(ProtobufUtil.toResultNoData(r));
+ cellsToReturn = r;
+ }
+ if (cellsToReturn != null) {
+ controller.setCellScanner(cellsToReturn.cellScanner());
}
return builder.build();
} catch (IOException ie) {
@@ -3006,7 +2872,8 @@ public class HRegionServer implements Cl
if (rsh != null) {
if (request.getNextCallSeq() != rsh.nextCallSeq) {
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
- + " But the nextCallSeq got from client: " + request.getNextCallSeq());
+ + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
+ "; request=" + TextFormat.shortDebugString(request));
}
// Increment the nextCallSeq value which is the next expected from client.
rsh.nextCallSeq++;
@@ -3208,47 +3075,61 @@ public class HRegionServer implements Cl
/**
* Execute multiple actions on a table: get, mutate, and/or execCoprocessor
*
- * @param controller the RPC controller
+ * @param rpcc the RPC controller
* @param request the multi request
* @throws ServiceException
*/
@Override
- public MultiResponse multi(final RpcController controller,
- final MultiRequest request) throws ServiceException {
+ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
+ throws ServiceException {
+ // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
+ // It is also the conduit via which we pass back data.
+ PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
+ CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+ // Clear scanner so we are not holding on to reference across call.
+ controller.setCellScanner(null);
+ List<CellScannable> cellsToReturn = null;
try {
HRegion region = getRegion(request.getRegion());
MultiResponse.Builder builder = MultiResponse.newBuilder();
+ List<MutationProto> mutations = new ArrayList<MutationProto>(request.getActionCount());
+ // Do a bunch of mutations atomically. Mutations are Puts and Deletes. NOT Gets.
if (request.hasAtomic() && request.getAtomic()) {
- List<Mutate> mutates = new ArrayList<Mutate>();
+ // MultiAction is union type. Has a Get or a Mutate.
for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
- if (actionUnion.hasMutate()) {
- mutates.add(actionUnion.getMutate());
+ if (actionUnion.hasMutation()) {
+ mutations.add(actionUnion.getMutation());
} else {
- throw new DoNotRetryIOException(
- "Unsupported atomic action type: " + actionUnion);
+ throw new DoNotRetryIOException("Unsupported atomic action type: " + actionUnion);
}
}
- mutateRows(region, mutates);
+ // TODO: We are not updating a metric here. Should we up requestCount?
+ if (!mutations.isEmpty()) mutateRows(region, mutations, cellScanner);
} else {
+ // Do a bunch of Actions.
ActionResult.Builder resultBuilder = null;
- List<Mutate> mutates = new ArrayList<Mutate>();
+ cellsToReturn = new ArrayList<CellScannable>(request.getActionCount());
for (ClientProtos.MultiAction actionUnion : request.getActionList()) {
- requestCount.increment();
+ this.requestCount.increment();
+ ClientProtos.Result result = null;
try {
- ClientProtos.Result result = null;
if (actionUnion.hasGet()) {
Get get = ProtobufUtil.toGet(actionUnion.getGet());
Result r = region.get(get);
if (r != null) {
- result = ProtobufUtil.toResult(r);
+ // Get a result with no data. The data will be carried alongside pbs, not as pbs.
+ result = ProtobufUtil.toResultNoData(r);
+ // Add the Result to controller so it gets serialized apart from pb. Get
+ // Results could be big so good if they are not serialized as pb.
+ cellsToReturn.add(r);
}
- } else if (actionUnion.hasMutate()) {
- Mutate mutate = actionUnion.getMutate();
- MutateType type = mutate.getMutateType();
- if (type != MutateType.PUT && type != MutateType.DELETE) {
- if (!mutates.isEmpty()) {
- doBatchOp(builder, region, mutates);
- mutates.clear();
+ } else if (actionUnion.hasMutation()) {
+ MutationProto mutation = actionUnion.getMutation();
+ MutationType type = mutation.getMutateType();
+ if (type != MutationType.PUT && type != MutationType.DELETE) {
+ if (!mutations.isEmpty()) {
+ doBatchOp(builder, region, mutations, cellScanner);
+ mutations.clear();
} else if (!region.getRegionInfo().isMetaTable()) {
cacheFlusher.reclaimMemStoreMemory();
}
@@ -3256,22 +3137,23 @@ public class HRegionServer implements Cl
Result r = null;
switch (type) {
case APPEND:
- r = append(region, mutate);
+ r = append(region, mutation, cellScanner);
break;
case INCREMENT:
- r = increment(region, mutate);
+ r = increment(region, mutation, cellScanner);
break;
case PUT:
- mutates.add(mutate);
- break;
case DELETE:
- mutates.add(mutate);
+ mutations.add(mutation);
break;
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
if (r != null) {
- result = ProtobufUtil.toResult(r);
+ // Put the data into the cellsToReturn and the metadata about the result is all that
+ // we will pass back in the protobuf result.
+ result = ProtobufUtil.toResultNoData(r);
+ cellsToReturn.add(r);
}
} else {
LOG.warn("Error: invalid action: " + actionUnion + ". "
@@ -3292,10 +3174,14 @@ public class HRegionServer implements Cl
builder.addResult(ResponseConverter.buildActionResult(ie));
}
}
- if (!mutates.isEmpty()) {
- doBatchOp(builder, region, mutates);
+ if (!mutations.isEmpty()) {
+ doBatchOp(builder, region, mutations, cellScanner);
}
}
+ // Load the controller with the Cells to return.
+ if (cellsToReturn != null && !cellsToReturn.isEmpty()) {
+ controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
+ }
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
@@ -3758,15 +3644,16 @@ public class HRegionServer implements Cl
* Execute an append mutation.
*
* @param region
- * @param mutate
+ * @param m
+ * @param cellScanner
* @return result to return to client if default operation should be
* bypassed as indicated by RegionObserver, null otherwise
* @throws IOException
*/
protected Result append(final HRegion region,
- final Mutate mutate) throws IOException {
+ final MutationProto m, final CellScanner cellScanner) throws IOException {
long before = EnvironmentEdgeManager.currentTimeMillis();
- Append append = ProtobufUtil.toAppend(mutate);
+ Append append = ProtobufUtil.toAppend(m, cellScanner);
Result r = null;
if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().preAppend(append);
@@ -3785,14 +3672,15 @@ public class HRegionServer implements Cl
* Execute an increment mutation.
*
* @param region
- * @param mutate
+ * @param mutation
* @return the Result
* @throws IOException
*/
- protected Result increment(final HRegion region,
- final Mutate mutate) throws IOException {
+ protected Result increment(final HRegion region, final MutationProto mutation,
+ final CellScanner cells)
+ throws IOException {
long before = EnvironmentEdgeManager.currentTimeMillis();
- Increment increment = ProtobufUtil.toIncrement(mutate);
+ Increment increment = ProtobufUtil.toIncrement(mutation, cells);
Result r = null;
if (region.getCoprocessorHost() != null) {
r = region.getCoprocessorHost().preIncrement(increment);
@@ -3812,12 +3700,12 @@ public class HRegionServer implements Cl
*
* @param builder
* @param region
- * @param mutates
+ * @param mutations
*/
- protected void doBatchOp(final MultiResponse.Builder builder,
- final HRegion region, final List<Mutate> mutates) {
+ protected void doBatchOp(final MultiResponse.Builder builder, final HRegion region,
+ final List<MutationProto> mutations, final CellScanner cells) {
@SuppressWarnings("unchecked")
- Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutates.size()];
+ Pair<Mutation, Integer>[] mutationsWithLocks = new Pair[mutations.size()];
long before = EnvironmentEdgeManager.currentTimeMillis();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
@@ -3825,21 +3713,20 @@ public class HRegionServer implements Cl
resultBuilder.setValue(ClientProtos.Result.newBuilder().build());
ActionResult result = resultBuilder.build();
int i = 0;
- for (Mutate m : mutates) {
+ for (MutationProto m : mutations) {
Mutation mutation;
- if (m.getMutateType() == MutateType.PUT) {
- mutation = ProtobufUtil.toPut(m);
+ if (m.getMutateType() == MutationType.PUT) {
+ mutation = ProtobufUtil.toPut(m, cells);
batchContainsPuts = true;
} else {
- mutation = ProtobufUtil.toDelete(m);
+ mutation = ProtobufUtil.toDelete(m, cells);
batchContainsDelete = true;
}
mutationsWithLocks[i++] = new Pair<Mutation, Integer>(mutation, null);
builder.addResult(result);
}
-
- requestCount.add(mutates.size());
+ requestCount.add(mutations.size());
if (!region.getRegionInfo().isMetaTable()) {
cacheFlusher.reclaimMemStoreMemory();
}
@@ -3871,7 +3758,7 @@ public class HRegionServer implements Cl
}
} catch (IOException ie) {
ActionResult result = ResponseConverter.buildActionResult(ie);
- for (int i = 0, n = mutates.size(); i < n; i++) {
+ for (int i = 0; i < mutations.size(); i++) {
builder.setResult(i, result);
}
}
@@ -3888,25 +3775,27 @@ public class HRegionServer implements Cl
* Mutate a list of rows atomically.
*
* @param region
- * @param mutates
+ * @param mutations
+ * @param cellScanner if non-null, the mutation data -- the Cell content.
* @throws IOException
*/
- protected void mutateRows(final HRegion region,
- final List<Mutate> mutates) throws IOException {
- Mutate firstMutate = mutates.get(0);
+ protected void mutateRows(final HRegion region, final List<MutationProto> mutations,
+ final CellScanner cellScanner)
+ throws IOException {
+ MutationProto firstMutate = mutations.get(0);
if (!region.getRegionInfo().isMetaTable()) {
cacheFlusher.reclaimMemStoreMemory();
}
- byte[] row = firstMutate.getRow().toByteArray();
+ byte [] row = firstMutate.getRow().toByteArray();
RowMutations rm = new RowMutations(row);
- for (Mutate mutate: mutates) {
- MutateType type = mutate.getMutateType();
+ for (MutationProto mutate: mutations) {
+ MutationType type = mutate.getMutateType();
switch (mutate.getMutateType()) {
case PUT:
- rm.add(ProtobufUtil.toPut(mutate));
+ rm.add(ProtobufUtil.toPut(mutate, cellScanner));
break;
case DELETE:
- rm.add(ProtobufUtil.toDelete(mutate));
+ rm.add(ProtobufUtil.toDelete(mutate, cellScanner));
break;
default:
throw new DoNotRetryIOException(
Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java?rev=1459015&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java Wed Mar 20 19:39:50 2013
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+
+/**
+ * A guava function that will return a priority for use by QoS facility in regionserver; e.g.
+ * rpcs to .META. and -ROOT-, etc., get priority.
+ */
+// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott
+// suggests and just have the client specify a priority.
+
+//The logic for figuring out high priority RPCs is as follows:
+//1. if the method is annotated with a QosPriority of QOS_HIGH,
+// that is honored
+//2. parse out the protobuf message and see if the request is for meta
+// region, and if so, treat it as a high priority RPC
+//Some optimizations for (2) are done here -
+//Clients send the argument classname as part of making the RPC. The server
+//decides whether to deserialize the proto argument message based on the
+//pre-established set of argument classes (knownArgumentClasses below).
+//This prevents the server from having to deserialize all proto argument
+//messages prematurely.
+//All the argument classes declare a 'getRegion' method that returns a
+//RegionSpecifier object. Methods can be invoked on the returned object
+//to figure out whether it is a meta region or not.
+class QosFunction implements Function<Pair<RequestHeader, Message>, Integer> {
+ public static final Log LOG = LogFactory.getLog(QosFunction.class.getName());
+ private final Map<String, Integer> annotatedQos;
+ //We need to mock the regionserver instance for some unit tests (set via
+ //setRegionServer method.
+ private HRegionServer hRegionServer;
+ @SuppressWarnings("unchecked")
+ private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
+ GetRegionInfoRequest.class,
+ GetStoreFileRequest.class,
+ CloseRegionRequest.class,
+ FlushRegionRequest.class,
+ SplitRegionRequest.class,
+ CompactRegionRequest.class,
+ GetRequest.class,
+ MutateRequest.class,
+ ScanRequest.class,
+ MultiRequest.class
+ };
+
+ // Some caches for helping performance
+ private final Map<String, Class<? extends Message>> argumentToClassMap =
+ new HashMap<String, Class<? extends Message>>();
+ private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
+ new HashMap<String, Map<Class<? extends Message>, Method>>();
+
+ QosFunction(final HRegionServer hrs) {
+ this.hRegionServer = hrs;
+ Map<String, Integer> qosMap = new HashMap<String, Integer>();
+ for (Method m : HRegionServer.class.getMethods()) {
+ QosPriority p = m.getAnnotation(QosPriority.class);
+ if (p != null) {
+ qosMap.put(m.getName(), p.priority());
+ }
+ }
+ this.annotatedQos = qosMap;
+
+ if (methodMap.get("getRegion") == null) {
+ methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
+ methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
+ }
+ for (Class<? extends Message> cls : knownArgumentClasses) {
+ argumentToClassMap.put(cls.getName(), cls);
+ try {
+ methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
+ methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public boolean isMetaRegion(byte[] regionName) {
+ HRegion region;
+ try {
+ region = hRegionServer.getRegion(regionName);
+ } catch (NotServingRegionException ignored) {
+ return false;
+ }
+ return region.getRegionInfo().isMetaTable();
+ }
+
+ @Override
+ public Integer apply(Pair<RequestHeader, Message> headerAndParam) {
+ RequestHeader header = headerAndParam.getFirst();
+ String methodName = header.getMethodName();
+ Integer priorityByAnnotation = annotatedQos.get(header.getMethodName());
+ if (priorityByAnnotation != null) {
+ return priorityByAnnotation;
+ }
+
+ Message param = headerAndParam.getSecond();
+ if (param == null) {
+ return HConstants.NORMAL_QOS;
+ }
+ String cls = param.getClass().getName();
+ Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
+ RegionSpecifier regionSpecifier = null;
+ //check whether the request has reference to meta region or now.
+ try {
+ // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
+ // hasRegion returns true. Not all listed methods have region specifier each time. For
+ // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
+ // send the region over every time.
+ Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
+ if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
+ Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
+ regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
+ HRegion region = hRegionServer.getRegion(regionSpecifier);
+ if (region.getRegionInfo().isMetaTable()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("High priority: " + TextFormat.shortDebugString(param));
+ }
+ return HConstants.HIGH_QOS;
+ }
+ }
+ } catch (Exception ex) {
+ // Not good throwing an exception out of here, a runtime anyways. Let the query go into the
+ // server and have it throw the exception if still an issue. Just mark it normal priority.
+ if (LOG.isDebugEnabled()) LOG.debug("Marking normal priority after getting exception=" + ex);
+ return HConstants.NORMAL_QOS;
+ }
+
+ if (methodName.equals("scan")) { // scanner methods...
+ ScanRequest request = (ScanRequest)param;
+ if (!request.hasScannerId()) {
+ return HConstants.NORMAL_QOS;
+ }
+ RegionScanner scanner = hRegionServer.getScanner(request.getScannerId());
+ if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("High priority scanner request: " + TextFormat.shortDebugString(request));
+ }
+ return HConstants.HIGH_QOS;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Low priority: " + TextFormat.shortDebugString(param));
+ }
+ return HConstants.NORMAL_QOS;
+ }
+
+ @VisibleForTesting
+ void setRegionServer(final HRegionServer hrs) {
+ this.hRegionServer = hrs;
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java Wed Mar 20 19:39:50 2013
@@ -279,7 +279,7 @@ public class IncrementCoalescer implemen
LOG.error("FAILED_ICV: " + Bytes.toString(row.getTable()) + ", "
+ Bytes.toStringBinary(row.getRowKey()) + ", "
+ Bytes.toStringBinary(row.getFamily()) + ", "
- + Bytes.toStringBinary(row.getQualifier()) + ", " + counter);
+ + Bytes.toStringBinary(row.getQualifier()) + ", " + counter, e);
}
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java Wed Mar 20 19:39:50 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
+import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.ShutdownHook;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Utility used running a cluster all in the one JVM.
@@ -201,13 +203,22 @@ public class JVMClusterUtil {
// Wait for an active master to be initialized (implies being master)
// with this, when we return the cluster is complete
startTime = System.currentTimeMillis();
+ final int maxwait = 200000;
while (true) {
JVMClusterUtil.MasterThread t = findActiveMaster(masters);
if (t != null && t.master.isInitialized()) {
return t.master.getServerName().toString();
}
- if (System.currentTimeMillis() > startTime + 200000) {
- throw new RuntimeException("Master not initialized after 200 seconds");
+ // REMOVE
+ if (System.currentTimeMillis() > startTime + 10000) {
+
+ Threads.sleep(1000);
+ }
+ if (System.currentTimeMillis() > startTime + maxwait) {
+ String msg = "Master not initialized after " + maxwait + "ms seconds";
+ ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
+ "Thread dump because: " + msg);
+ throw new RuntimeException(msg);
}
try {
Thread.sleep(100);
@@ -279,8 +290,6 @@ public class JVMClusterUtil {
}
}
-
-
if (masters != null) {
for (JVMClusterUtil.MasterThread t : masters) {
while (t.master.isAlive() && !wasInterrupted) {
@@ -306,4 +315,4 @@ public class JVMClusterUtil {
Thread.currentThread().interrupt();
}
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Wed Mar 20 19:39:50 2013
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
@@ -50,6 +51,8 @@ import org.apache.hadoop.hbase.exception
import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -60,6 +63,7 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
import org.junit.*;
import org.junit.experimental.categories.Category;
@@ -79,6 +83,9 @@ public class TestAdmin {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java Wed Mar 20 19:39:50 2013
@@ -23,14 +23,18 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -55,6 +59,9 @@ public class TestClientScannerRPCTimeout
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Wed Mar 20 19:39:50 2013
@@ -47,9 +47,9 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -78,19 +78,23 @@ import org.apache.hadoop.hbase.filter.Wh
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiRowMutationService;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -120,6 +124,9 @@ public class TestFromClientSide {
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MultiRowMutationEndpoint.class.getName());
@@ -4148,11 +4155,11 @@ public class TestFromClientSide {
HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
Put p = new Put(ROW);
p.add(FAMILY, QUALIFIER, VALUE);
- Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p);
+ MutationProto m1 = ProtobufUtil.toMutation(MutationType.PUT, p);
p = new Put(ROW1);
p.add(FAMILY, QUALIFIER, VALUE);
- Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p);
+ MutationProto m2 = ProtobufUtil.toMutation(MutationType.PUT, p);
MultiMutateRequest.Builder mrmBuilder = MultiMutateRequest.newBuilder();
mrmBuilder.addMutationRequest(m1);
@@ -4195,6 +4202,8 @@ public class TestFromClientSide {
Delete d = new Delete(ROW);
d.deleteColumns(FAMILY, QUALIFIERS[0]);
arm.add(d);
+ // TODO: Trying mutateRow again. The batch was failing with a one try only.
+ // t.mutateRow(arm);
t.batch(Arrays.asList((Row)arm));
r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java Wed Mar 20 19:39:50 2013
@@ -39,4 +39,4 @@ public class TestFromClientSideWithCopro
// We need more than one region server in this test
TEST_UTIL.startMiniCluster(SLAVES);
}
-}
+}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Mar 20 19:39:50 2013
@@ -660,6 +660,8 @@ public class TestHCM {
}
}
assertNotNull(otherRow);
+ // If empty row, set it to first row.-f
+ if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
Put put2 = new Put(otherRow);
put2.add(FAM_NAM, otherRow, otherRow);
table.put(put2); //cache put2's location
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java Wed Mar 20 19:39:50 2013
@@ -68,6 +68,9 @@ public class TestMultiParallel {
private static final int slaves = 2; // also used for testing HTable pool size
@BeforeClass public static void beforeClass() throws Exception {
+ ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
UTIL.startMiniCluster(slaves);
HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java?rev=1459015&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/CodecPerformance.java Wed Mar 20 19:39:50 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.CellCodec;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.codec.MessageCodec;
+import org.apache.hadoop.hbase.io.CellOutputStream;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Do basic codec performance eval.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CodecPerformance {
+ public static final Log LOG = LogFactory.getLog(CodecPerformance.class);
+
+ static Cell [] getCells(final int howMany) {
+ Cell [] cells = new Cell[howMany];
+ for (int i = 0; i < howMany; i++) {
+ byte [] index = Bytes.toBytes(i);
+ KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, index);
+ cells[i] = kv;
+ }
+ return cells;
+ }
+
+ static int getRoughSize(final Cell [] cells) {
+ int size = 0;
+ for (Cell c: cells) {
+ size += c.getRowLength() + c.getFamilyLength() + c.getQualifierLength() + c.getValueLength();
+ size += Bytes.SIZEOF_LONG + Bytes.SIZEOF_BYTE;
+ }
+ return size;
+ }
+
+ static byte [] runEncoderTest(final int index, final int initialBufferSize,
+ final ByteArrayOutputStream baos, final CellOutputStream encoder, final Cell [] cells)
+ throws IOException {
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < cells.length; i++) {
+ encoder.write(cells[i]);
+ }
+ encoder.flush();
+ LOG.info("" + index + " encoded count=" + cells.length + " in " +
+ (System.currentTimeMillis() - startTime) + "ms for encoder " + encoder);
+ // Ensure we did not have to grow the backing buffer.
+ assertTrue(baos.size() < initialBufferSize);
+ return baos.toByteArray();
+ }
+
+ static Cell [] runDecoderTest(final int index, final int count, final CellScanner decoder)
+ throws IOException {
+ Cell [] cells = new Cell[count];
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; decoder.advance(); i++) {
+ cells[i] = decoder.current();
+ }
+ LOG.info("" + index + " decoded count=" + cells.length + " in " +
+ (System.currentTimeMillis() - startTime) + "ms for decoder " + decoder);
+ // Ensure we did not have to grow the backing buffer.
+ assertTrue(cells.length == count);
+ return cells;
+ }
+
+ static void verifyCells(final Cell [] input, final Cell [] output) {
+ assertEquals(input.length, output.length);
+ for (int i = 0; i < input.length; i ++) {
+ input[i].equals(output[i]);
+ }
+ }
+
+ static void doCodec(final Codec codec, final Cell [] cells, final int cycles, final int count,
+ final int initialBufferSize)
+ throws IOException {
+ byte [] bytes = null;
+ Cell [] cellsDecoded = null;
+ for (int i = 0; i < cycles; i++) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(initialBufferSize);
+ Codec.Encoder encoder = codec.getEncoder(baos);
+ bytes = runEncoderTest(i, initialBufferSize, baos, encoder, cells);
+ }
+ for (int i = 0; i < cycles; i++) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ Codec.Decoder decoder = codec.getDecoder(bais);
+ cellsDecoded = CodecPerformance.runDecoderTest(i, count, decoder);
+ }
+ verifyCells(cells, cellsDecoded);
+ }
+
+ public static void main(String[] args) throws IOException {
+ // How many Cells to encode/decode on each cycle.
+ final int count = 100000;
+ // How many times to do an operation; repeat gives hotspot chance to warm up.
+ final int cycles = 30;
+
+ Cell [] cells = getCells(count);
+ int size = getRoughSize(cells);
+ int initialBufferSize = 2 * size; // Multiply by 2 to ensure we don't have to grow buffer
+
+ // Test KeyValue codec.
+ doCodec(new KeyValueCodec(), cells, cycles, count, initialBufferSize);
+ doCodec(new CellCodec(), cells, cycles, count, initialBufferSize);
+ doCodec(new MessageCodec(), cells, cycles, count, initialBufferSize);
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java?rev=1459015&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/codec/TestCellMessageCodec.java Wed Mar 20 19:39:50 2013
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.MessageCodec;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.io.CountingInputStream;
+import com.google.common.io.CountingOutputStream;
+
+@Category(SmallTests.class)
+public class TestCellMessageCodec {
+ public static final Log LOG = LogFactory.getLog(TestCellMessageCodec.class);
+
+ @Test
+ public void testEmptyWorks() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ CountingOutputStream cos = new CountingOutputStream(baos);
+ DataOutputStream dos = new DataOutputStream(cos);
+ MessageCodec cmc = new MessageCodec();
+ Codec.Encoder encoder = cmc.getEncoder(dos);
+ encoder.flush();
+ dos.close();
+ long offset = cos.getCount();
+ assertEquals(0, offset);
+ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ DataInputStream dis = new DataInputStream(cis);
+ Codec.Decoder decoder = cmc.getDecoder(dis);
+ assertFalse(decoder.advance());
+ dis.close();
+ assertEquals(0, cis.getCount());
+ }
+
+ @Test
+ public void testOne() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ CountingOutputStream cos = new CountingOutputStream(baos);
+ DataOutputStream dos = new DataOutputStream(cos);
+ MessageCodec cmc = new MessageCodec();
+ Codec.Encoder encoder = cmc.getEncoder(dos);
+ final KeyValue kv =
+ new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), Bytes.toBytes("v"));
+ encoder.write(kv);
+ encoder.flush();
+ dos.close();
+ long offset = cos.getCount();
+ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ DataInputStream dis = new DataInputStream(cis);
+ Codec.Decoder decoder = cmc.getDecoder(dis);
+ assertTrue(decoder.advance()); // First read should pull in the KV
+ assertFalse(decoder.advance()); // Second read should trip over the end-of-stream marker and return false
+ dis.close();
+ assertEquals(offset, cis.getCount());
+ }
+
+ @Test
+ public void testThree() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ CountingOutputStream cos = new CountingOutputStream(baos);
+ DataOutputStream dos = new DataOutputStream(cos);
+ MessageCodec cmc = new MessageCodec();
+ Codec.Encoder encoder = cmc.getEncoder(dos);
+ final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"), Bytes.toBytes("1"));
+ final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"), Bytes.toBytes("2"));
+ final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"), Bytes.toBytes("3"));
+ encoder.write(kv1);
+ encoder.write(kv2);
+ encoder.write(kv3);
+ encoder.flush();
+ dos.close();
+ long offset = cos.getCount();
+ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ DataInputStream dis = new DataInputStream(cis);
+ Codec.Decoder decoder = cmc.getDecoder(dis);
+ assertTrue(decoder.advance());
+ Cell c = decoder.current();
+ assertTrue(CellComparator.equals(c, kv1));
+ assertTrue(decoder.advance());
+ c = decoder.current();
+ assertTrue(CellComparator.equals(c, kv2));
+ assertTrue(decoder.advance());
+ c = decoder.current();
+ assertTrue(CellComparator.equals(c, kv3));
+ assertFalse(decoder.advance());
+ dis.close();
+ assertEquals(offset, cis.getCount());
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterWithScanLimits.java Wed Mar 20 19:39:50 2013
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -33,12 +34,15 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
@@ -46,6 +50,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.log4j.Level;
import org.junit.experimental.categories.Category;
/**
@@ -172,7 +177,9 @@ public class TestFilterWithScanLimits {
@BeforeClass
public static void setUp() throws Exception {
- Configuration config = TEST_UTIL.getConfiguration();
+ ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
TEST_UTIL.startMiniCluster(1);
initialize(TEST_UTIL.getConfiguration());
}
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Wed Mar 20 19:39:50 2013
@@ -44,6 +44,8 @@ import org.apache.log4j.spi.LoggingEvent
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.protobuf.ServiceException;
+
/**
* Test that delayed RPCs work. Fire up three calls, the first of which should
* be delayed. Check that the last two, which are undelayed, return before the
@@ -100,8 +102,7 @@ public class TestDelayedRpc {
assertEquals(UNDELAYED, results.get(0).intValue());
assertEquals(UNDELAYED, results.get(1).intValue());
- assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED :
- 0xDEADBEEF);
+ assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
} finally {
clientEngine.close();
}
@@ -182,7 +183,7 @@ public class TestDelayedRpc {
}
public interface TestRpc extends IpcProtocol {
- TestResponse test(TestArg delay);
+ TestResponse test(final Object rpcController, TestArg delay) throws ServiceException;
}
private static class TestRpcImpl implements TestRpc {
@@ -201,7 +202,8 @@ public class TestDelayedRpc {
}
@Override
- public TestResponse test(final TestArg testArg) {
+ public TestResponse test(final Object rpcController, final TestArg testArg)
+ throws ServiceException {
boolean delay = testArg.getDelay();
TestResponse.Builder responseBuilder = TestResponse.newBuilder();
if (!delay) {
@@ -243,9 +245,8 @@ public class TestDelayedRpc {
@Override
public void run() {
try {
- Integer result =
- new Integer(server.test(TestArg.newBuilder()
- .setDelay(delay).build()).getResponse());
+ Integer result = new Integer(server.test(null, TestArg.newBuilder().setDelay(delay).
+ build()).getResponse());
if (results != null) {
synchronized (results) {
results.add(result);
@@ -276,7 +277,7 @@ public class TestDelayedRpc {
int result = 0xDEADBEEF;
try {
- result = client.test(TestArg.newBuilder().setDelay(false).build()).getResponse();
+ result = client.test(null, TestArg.newBuilder().setDelay(false).build()).getResponse();
} catch (Exception e) {
fail("No exception should have been thrown.");
}
@@ -284,7 +285,7 @@ public class TestDelayedRpc {
boolean caughtException = false;
try {
- result = client.test(TestArg.newBuilder().setDelay(true).build()).getResponse();
+ result = client.test(null, TestArg.newBuilder().setDelay(true).build()).getResponse();
} catch(Exception e) {
// Exception thrown by server is enclosed in a RemoteException.
if (e.getCause().getMessage().contains(
@@ -303,7 +304,7 @@ public class TestDelayedRpc {
*/
private static class FaultyTestRpc implements TestRpc {
@Override
- public TestResponse test(TestArg arg) {
+ public TestResponse test(Object rpcController, TestArg arg) {
if (!arg.getDelay())
return TestResponse.newBuilder().setResponse(UNDELAYED).build();
Delayable call = HBaseServer.getCurrentCall();