You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/13 22:28:22 UTC
svn commit: r1325937 [2/7] - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/catalog/
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/filter/
main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/ipc...
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Fri Apr 13 20:28:21 2012
@@ -17,12 +17,89 @@
*/
package org.apache.hadoop.hbase.protobuf;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Exec;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UUID;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.FamilyScope;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALKey;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.DeleteType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
/**
* Protobufs utility.
*/
-public class ProtobufUtil {
+public final class ProtobufUtil {
+
+ private ProtobufUtil() {
+ }
+
+ /**
+ * Primitive type to class mapping.
+ */
+ private final static Map<String, Class<?>>
+ PRIMITIVES = new HashMap<String, Class<?>>();
+
+ static {
+ PRIMITIVES.put(Boolean.TYPE.getName(), Boolean.TYPE);
+ PRIMITIVES.put(Byte.TYPE.getName(), Byte.TYPE);
+ PRIMITIVES.put(Character.TYPE.getName(), Character.TYPE);
+ PRIMITIVES.put(Short.TYPE.getName(), Short.TYPE);
+ PRIMITIVES.put(Integer.TYPE.getName(), Integer.TYPE);
+ PRIMITIVES.put(Long.TYPE.getName(), Long.TYPE);
+ PRIMITIVES.put(Float.TYPE.getName(), Float.TYPE);
+ PRIMITIVES.put(Double.TYPE.getName(), Double.TYPE);
+ PRIMITIVES.put(Void.TYPE.getName(), Void.TYPE);
+ }
+
/**
* Magic we put ahead of a serialized protobuf message.
* For example, all znode content is protobuf messages with the below magic
@@ -56,4 +133,592 @@ public class ProtobufUtil {
public static int lengthOfPBMagic() {
return PB_MAGIC.length;
}
+
+ /**
+ * Return the IOException thrown by the remote server wrapped in
+ * ServiceException as cause.
+ *
+ * @param se ServiceException that wraps IO exception thrown by the server
+ * @return Exception wrapped in ServiceException or
+ * a new IOException that wraps the unexpected ServiceException.
+ */
+ public static IOException getRemoteException(ServiceException se) {
+ Throwable e = se.getCause();
+ if (e == null) {
+ return new IOException(se);
+ }
+ return e instanceof IOException ? (IOException) e : new IOException(se);
+ }
+
+ /**
+ * Convert a protocol buffer Exec to a client Exec
+ *
+ * @param proto the protocol buffer Exec to convert
+ * @return the converted client Exec
+ */
+ @SuppressWarnings("unchecked")
+ public static Exec toExec(
+ final ClientProtos.Exec proto) throws IOException {
+ byte[] row = proto.getRow().toByteArray();
+ String protocolName = proto.getProtocolName();
+ String methodName = proto.getMethodName();
+ List<Object> parameters = new ArrayList<Object>();
+ Class<? extends CoprocessorProtocol> protocol = null;
+ Method method = null;
+ try {
+ List<Class<?>> types = new ArrayList<Class<?>>();
+ for (NameBytesPair parameter: proto.getParameterList()) {
+ String type = parameter.getName();
+ Class<?> declaredClass = PRIMITIVES.get(type);
+ if (declaredClass == null) {
+ declaredClass = Class.forName(parameter.getName());
+ }
+ parameters.add(toObject(parameter));
+ types.add(declaredClass);
+ }
+ Class<?> [] parameterTypes = new Class<?> [types.size()];
+ types.toArray(parameterTypes);
+ protocol = (Class<? extends CoprocessorProtocol>)
+ Class.forName(protocolName);
+ method = protocol.getMethod(methodName, parameterTypes);
+ } catch (NoSuchMethodException nsme) {
+ throw new IOException(nsme);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ Configuration conf = HBaseConfiguration.create();
+ for (NameStringPair p: proto.getPropertyList()) {
+ conf.set(p.getName(), p.getValue());
+ }
+ Object[] parameterObjects = new Object[parameters.size()];
+ parameters.toArray(parameterObjects);
+ return new Exec(conf, row, protocol,
+ method, parameterObjects);
+ }
+
+ /**
+ * Convert a ServerName to a protocol buffer ServerName
+ *
+ * @param serverName the ServerName to convert
+ * @return the converted protocol buffer ServerName
+ */
+ public static HBaseProtos.ServerName
+ toServerName(final ServerName serverName) {
+ if (serverName == null) return null;
+ HBaseProtos.ServerName.Builder builder =
+ HBaseProtos.ServerName.newBuilder();
+ builder.setHostName(serverName.getHostname());
+ if (serverName.getPort() >= 0) {
+ builder.setPort(serverName.getPort());
+ }
+ if (serverName.getStartcode() >= 0) {
+ builder.setStartCode(serverName.getStartcode());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Convert a RegionInfo to a HRegionInfo
+ *
+ * @param proto the RegionInfo to convert
+ * @return the converted HRegionInfo
+ */
+ public static HRegionInfo
+ toRegionInfo(final RegionInfo proto) {
+ if (proto == null) return null;
+ byte[] tableName = proto.getTableName().toByteArray();
+ long regionId = proto.getRegionId();
+ byte[] startKey = null;
+ byte[] endKey = null;
+ if (proto.hasStartKey()) {
+ startKey = proto.getStartKey().toByteArray();
+ }
+ if (proto.hasEndKey()) {
+ endKey = proto.getEndKey().toByteArray();
+ }
+
+ return new HRegionInfo(tableName,
+ startKey, endKey, false, regionId);
+ }
+
+ /**
+ * Convert a HRegionInfo to a RegionInfo
+ *
+ * @param info the HRegionInfo to convert
+ * @return the converted RegionInfo
+ */
+ public static RegionInfo
+ toRegionInfo(final HRegionInfo info) {
+ if (info == null) return null;
+ RegionInfo.Builder builder = RegionInfo.newBuilder();
+ builder.setTableName(ByteString.copyFrom(info.getTableName()));
+ builder.setRegionId(info.getRegionId());
+ if (info.getStartKey() != null) {
+ builder.setStartKey(ByteString.copyFrom(info.getStartKey()));
+ }
+ if (info.getEndKey() != null) {
+ builder.setEndKey(ByteString.copyFrom(info.getEndKey()));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Convert a protocol buffer Get to a client Get
+ *
+ * @param get the protocol buffer Get to convert
+ * @return the converted client Get
+ * @throws IOException
+ */
+ public static Get toGet(
+ final ClientProtos.Get proto) throws IOException {
+ if (proto == null) return null;
+ byte[] row = proto.getRow().toByteArray();
+ RowLock rowLock = null;
+ if (proto.hasLockId()) {
+ rowLock = new RowLock(proto.getLockId());
+ }
+ Get get = new Get(row, rowLock);
+ if (proto.hasCacheBlocks()) {
+ get.setCacheBlocks(proto.getCacheBlocks());
+ }
+ if (proto.hasMaxVersions()) {
+ get.setMaxVersions(proto.getMaxVersions());
+ }
+ if (proto.hasTimeRange()) {
+ HBaseProtos.TimeRange timeRange = proto.getTimeRange();
+ long minStamp = 0;
+ long maxStamp = Long.MAX_VALUE;
+ if (timeRange.hasFrom()) {
+ minStamp = timeRange.getFrom();
+ }
+ if (timeRange.hasTo()) {
+ maxStamp = timeRange.getTo();
+ }
+ get.setTimeRange(minStamp, maxStamp);
+ }
+ if (proto.hasFilter()) {
+ NameBytesPair filter = proto.getFilter();
+ get.setFilter((Filter)toObject(filter));
+ }
+ for (NameBytesPair attribute: proto.getAttributeList()) {
+ get.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
+ }
+ if (proto.getColumnCount() > 0) {
+ for (Column column: proto.getColumnList()) {
+ byte[] family = column.getFamily().toByteArray();
+ if (column.getQualifierCount() > 0) {
+ for (ByteString qualifier: column.getQualifierList()) {
+ get.addColumn(family, qualifier.toByteArray());
+ }
+ } else {
+ get.addFamily(family);
+ }
+ }
+ }
+ return get;
+ }
+
+ /**
+ * Convert a protocol buffer Mutate to a Put
+ *
+ * @param proto the protocol buffer Mutate to convert
+ * @return the converted client Put
+ * @throws DoNotRetryIOException
+ */
+ public static Put toPut(
+ final Mutate proto) throws DoNotRetryIOException {
+ MutateType type = proto.getMutateType();
+ assert type == MutateType.PUT : type.name();
+ byte[] row = proto.getRow().toByteArray();
+ long timestamp = HConstants.LATEST_TIMESTAMP;
+ if (proto.hasTimestamp()) {
+ timestamp = proto.getTimestamp();
+ }
+ RowLock lock = null;
+ if (proto.hasLockId()) {
+ lock = new RowLock(proto.getLockId());
+ }
+ Put put = new Put(row, timestamp, lock);
+ put.setWriteToWAL(proto.getWriteToWAL());
+ for (NameBytesPair attribute: proto.getAttributeList()) {
+ put.setAttribute(attribute.getName(),
+ attribute.getValue().toByteArray());
+ }
+ for (ColumnValue column: proto.getColumnValueList()) {
+ byte[] family = column.getFamily().toByteArray();
+ for (QualifierValue qv: column.getQualifierValueList()) {
+ byte[] qualifier = qv.getQualifier().toByteArray();
+ if (!qv.hasValue()) {
+ throw new DoNotRetryIOException(
+ "Missing required field: qualifer value");
+ }
+ byte[] value = qv.getValue().toByteArray();
+ long ts = timestamp;
+ if (qv.hasTimestamp()) {
+ ts = qv.getTimestamp();
+ }
+ put.add(family, qualifier, ts, value);
+ }
+ }
+ return put;
+ }
+
+ /**
+ * Convert a protocol buffer Mutate to a Delete
+ *
+ * @param proto the protocol buffer Mutate to convert
+ * @return the converted client Delete
+ */
+ public static Delete toDelete(final Mutate proto) {
+ MutateType type = proto.getMutateType();
+ assert type == MutateType.DELETE : type.name();
+ byte[] row = proto.getRow().toByteArray();
+ long timestamp = HConstants.LATEST_TIMESTAMP;
+ if (proto.hasTimestamp()) {
+ timestamp = proto.getTimestamp();
+ }
+ RowLock lock = null;
+ if (proto.hasLockId()) {
+ lock = new RowLock(proto.getLockId());
+ }
+ Delete delete = new Delete(row, timestamp, lock);
+ delete.setWriteToWAL(proto.getWriteToWAL());
+ for (NameBytesPair attribute: proto.getAttributeList()) {
+ delete.setAttribute(attribute.getName(),
+ attribute.getValue().toByteArray());
+ }
+ for (ColumnValue column: proto.getColumnValueList()) {
+ byte[] family = column.getFamily().toByteArray();
+ for (QualifierValue qv: column.getQualifierValueList()) {
+ DeleteType deleteType = qv.getDeleteType();
+ byte[] qualifier = null;
+ if (qv.hasQualifier()) {
+ qualifier = qv.getQualifier().toByteArray();
+ }
+ long ts = HConstants.LATEST_TIMESTAMP;
+ if (qv.hasTimestamp()) {
+ ts = qv.getTimestamp();
+ }
+ if (deleteType == DeleteType.DELETE_ONE_VERSION) {
+ delete.deleteColumn(family, qualifier, ts);
+ } else if (deleteType == DeleteType.DELETE_MULTIPLE_VERSIONS) {
+ delete.deleteColumns(family, qualifier, ts);
+ } else {
+ delete.deleteFamily(family, ts);
+ }
+ }
+ }
+ return delete;
+ }
+
+ /**
+ * Convert a protocol buffer Mutate to an Append
+ *
+ * @param proto the protocol buffer Mutate to convert
+ * @return the converted client Append
+ * @throws DoNotRetryIOException
+ */
+ public static Append toAppend(
+ final Mutate proto) throws DoNotRetryIOException {
+ MutateType type = proto.getMutateType();
+ assert type == MutateType.APPEND : type.name();
+ byte[] row = proto.getRow().toByteArray();
+ Append append = new Append(row);
+ append.setWriteToWAL(proto.getWriteToWAL());
+ for (NameBytesPair attribute: proto.getAttributeList()) {
+ append.setAttribute(attribute.getName(),
+ attribute.getValue().toByteArray());
+ }
+ for (ColumnValue column: proto.getColumnValueList()) {
+ byte[] family = column.getFamily().toByteArray();
+ for (QualifierValue qv: column.getQualifierValueList()) {
+ byte[] qualifier = qv.getQualifier().toByteArray();
+ if (!qv.hasValue()) {
+ throw new DoNotRetryIOException(
+ "Missing required field: qualifer value");
+ }
+ byte[] value = qv.getValue().toByteArray();
+ append.add(family, qualifier, value);
+ }
+ }
+ return append;
+ }
+
+ /**
+ * Convert a protocol buffer Mutate to an Increment
+ *
+ * @param proto the protocol buffer Mutate to convert
+ * @return the converted client Increment
+ * @throws IOException
+ */
+ public static Increment toIncrement(
+ final Mutate proto) throws IOException {
+ MutateType type = proto.getMutateType();
+ assert type == MutateType.INCREMENT : type.name();
+ RowLock lock = null;
+ if (proto.hasLockId()) {
+ lock = new RowLock(proto.getLockId());
+ }
+ byte[] row = proto.getRow().toByteArray();
+ Increment increment = new Increment(row, lock);
+ increment.setWriteToWAL(proto.getWriteToWAL());
+ if (proto.hasTimeRange()) {
+ HBaseProtos.TimeRange timeRange = proto.getTimeRange();
+ long minStamp = 0;
+ long maxStamp = Long.MAX_VALUE;
+ if (timeRange.hasFrom()) {
+ minStamp = timeRange.getFrom();
+ }
+ if (timeRange.hasTo()) {
+ maxStamp = timeRange.getTo();
+ }
+ increment.setTimeRange(minStamp, maxStamp);
+ }
+ for (ColumnValue column: proto.getColumnValueList()) {
+ byte[] family = column.getFamily().toByteArray();
+ for (QualifierValue qv: column.getQualifierValueList()) {
+ byte[] qualifier = qv.getQualifier().toByteArray();
+ if (!qv.hasValue()) {
+ throw new DoNotRetryIOException(
+ "Missing required field: qualifer value");
+ }
+ long value = Bytes.toLong(qv.getValue().toByteArray());
+ increment.addColumn(family, qualifier, value);
+ }
+ }
+ return increment;
+ }
+
+ /**
+ * Convert a protocol buffer Scan to a client Scan
+ *
+ * @param proto the protocol buffer Scan to convert
+ * @return the converted client Scan
+ * @throws IOException
+ */
+ public static Scan toScan(
+ final ClientProtos.Scan proto) throws IOException {
+ byte [] startRow = HConstants.EMPTY_START_ROW;
+ byte [] stopRow = HConstants.EMPTY_END_ROW;
+ if (proto.hasStartRow()) {
+ startRow = proto.getStartRow().toByteArray();
+ }
+ if (proto.hasStopRow()) {
+ stopRow = proto.getStopRow().toByteArray();
+ }
+ Scan scan = new Scan(startRow, stopRow);
+ if (proto.hasCacheBlocks()) {
+ scan.setCacheBlocks(proto.getCacheBlocks());
+ }
+ if (proto.hasMaxVersions()) {
+ scan.setMaxVersions(proto.getMaxVersions());
+ }
+ if (proto.hasTimeRange()) {
+ HBaseProtos.TimeRange timeRange = proto.getTimeRange();
+ long minStamp = 0;
+ long maxStamp = Long.MAX_VALUE;
+ if (timeRange.hasFrom()) {
+ minStamp = timeRange.getFrom();
+ }
+ if (timeRange.hasTo()) {
+ maxStamp = timeRange.getTo();
+ }
+ scan.setTimeRange(minStamp, maxStamp);
+ }
+ if (proto.hasFilter()) {
+ NameBytesPair filter = proto.getFilter();
+ scan.setFilter((Filter)toObject(filter));
+ }
+ if (proto.hasBatchSize()) {
+ scan.setBatch(proto.getBatchSize());
+ }
+ for (NameBytesPair attribute: proto.getAttributeList()) {
+ scan.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
+ }
+ if (proto.getColumnCount() > 0) {
+ for (Column column: proto.getColumnList()) {
+ byte[] family = column.getFamily().toByteArray();
+ if (column.getQualifierCount() > 0) {
+ for (ByteString qualifier: column.getQualifierList()) {
+ scan.addColumn(family, qualifier.toByteArray());
+ }
+ } else {
+ scan.addFamily(family);
+ }
+ }
+ }
+ return scan;
+ }
+
+ /**
+ * Convert a client Result to a protocol buffer Result
+ *
+ * @param result the client Result to convert
+ * @return the converted protocol buffer Result
+ */
+ public static ClientProtos.Result toResult(final Result result) {
+ ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
+ List<ByteString> protos = new ArrayList<ByteString>();
+ List<KeyValue> keyValues = result.list();
+ if (keyValues != null) {
+ for (KeyValue keyValue: keyValues) {
+ ByteString value = ByteString.copyFrom(keyValue.getBuffer(),
+ keyValue.getOffset(), keyValue.getLength());
+ protos.add(value);
+ }
+ }
+ builder.addAllKeyValueBytes(protos);
+ return builder.build();
+ }
+
+ /**
+ * Convert a protocol buffer Result to a client Result
+ *
+ * @param proto the protocol buffer Result to convert
+ * @return the converted client Result
+ */
+ public static Result toResult(final ClientProtos.Result proto) {
+ List<ByteString> values = proto.getKeyValueBytesList();
+ List<KeyValue> keyValues = new ArrayList<KeyValue>(values.size());
+ for (ByteString value: values) {
+ keyValues.add(new KeyValue(value.toByteArray()));
+ }
+ return new Result(keyValues);
+ }
+
+ /**
+ * Get the HLog entries from a list of protocol buffer WALEntry
+ *
+ * @param protoList the list of protocol buffer WALEntry
+ * @return an array of HLog entries
+ */
+ public static HLog.Entry[]
+ toHLogEntries(final List<WALEntry> protoList) {
+ List<HLog.Entry> entries = new ArrayList<HLog.Entry>();
+ for (WALEntry entry: protoList) {
+ WALKey walKey = entry.getWalKey();
+ java.util.UUID clusterId = HConstants.DEFAULT_CLUSTER_ID;
+ if (walKey.hasClusterId()) {
+ UUID protoUuid = walKey.getClusterId();
+ clusterId = new java.util.UUID(
+ protoUuid.getMostSigBits(), protoUuid.getLeastSigBits());
+ }
+ HLogKey key = new HLogKey(walKey.getEncodedRegionName().toByteArray(),
+ walKey.getTableName().toByteArray(), walKey.getLogSequenceNumber(),
+ walKey.getWriteTime(), clusterId);
+ WALEntry.WALEdit walEdit = entry.getEdit();
+ WALEdit edit = new WALEdit();
+ for (ByteString keyValue: walEdit.getKeyValueList()) {
+ edit.add(new KeyValue(keyValue.toByteArray()));
+ }
+ if (walEdit.getFamilyScopeCount() > 0) {
+ TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>();
+ for (FamilyScope scope: walEdit.getFamilyScopeList()) {
+ scopes.put(scope.getFamily().toByteArray(),
+ Integer.valueOf(scope.getScopeType().ordinal()));
+ }
+ edit.setScopes(scopes);
+ }
+ entries.add(new HLog.Entry(key, edit));
+ }
+ return entries.toArray(new HLog.Entry[entries.size()]);
+ }
+
+ /**
+ * Convert a protocol buffer Parameter to a Java object
+ *
+ * @param parameter the protocol buffer Parameter to convert
+ * @return the converted Java object
+ * @throws IOException if failed to deserialize the parameter
+ */
+ public static Object toObject(
+ final NameBytesPair parameter) throws IOException {
+ if (parameter == null || !parameter.hasValue()) return null;
+ byte[] bytes = parameter.getValue().toByteArray();
+ ByteArrayInputStream bais = null;
+ try {
+ bais = new ByteArrayInputStream(bytes);
+ DataInput in = new DataInputStream(bais);
+ return HbaseObjectWritable.readObject(in, null);
+ } finally {
+ if (bais != null) {
+ bais.close();
+ }
+ }
+ }
+
+ /**
+ * Convert a stringified protocol buffer exception Parameter to a Java Exception
+ *
+ * @param parameter the protocol buffer Parameter to convert
+ * @return the converted Exception
+ * @throws IOException if failed to deserialize the parameter
+ */
+ @SuppressWarnings("unchecked")
+ public static Throwable toException(
+ final NameBytesPair parameter) throws IOException {
+ if (parameter == null || !parameter.hasValue()) return null;
+ String desc = parameter.getValue().toStringUtf8();
+ String type = parameter.getName();
+ try {
+ Class<? extends Throwable> c =
+ (Class<? extends Throwable>)Class.forName(type);
+ Constructor<? extends Throwable> cn =
+ c.getDeclaredConstructor(String.class);
+ return cn.newInstance(desc);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Serialize a Java Object into a Parameter. The Java Object should be a
+ * Writable or protocol buffer Message
+ *
+ * @param value the Writable/Message object to be serialized
+ * @return the converted protocol buffer Parameter
+ * @throws IOException if failed to serialize the object
+ */
+ public static NameBytesPair toParameter(
+ final Object value) throws IOException {
+ Class<?> declaredClass = Object.class;
+ if (value != null) {
+ declaredClass = value.getClass();
+ }
+ return toParameter(declaredClass, value);
+ }
+
+ /**
+ * Serialize a Java Object into a Parameter. The Java Object should be a
+ * Writable or protocol buffer Message
+ *
+ * @param declaredClass the declared class of the parameter
+ * @param value the Writable/Message object to be serialized
+ * @return the converted protocol buffer Parameter
+ * @throws IOException if failed to serialize the object
+ */
+ public static NameBytesPair toParameter(
+ final Class<?> declaredClass, final Object value) throws IOException {
+ NameBytesPair.Builder builder = NameBytesPair.newBuilder();
+ builder.setName(declaredClass.getName());
+ if (value != null) {
+ ByteArrayOutputStream baos = null;
+ try {
+ baos = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(baos);
+ Class<?> clz = declaredClass;
+ if (HbaseObjectWritable.getClassCode(declaredClass) == null) {
+ clz = value.getClass();
+ }
+ HbaseObjectWritable.writeObject(out, value, clz, null);
+ builder.setValue(
+ ByteString.copyFrom(baos.toByteArray()));
+ } finally {
+ if (baos != null) {
+ baos.close();
+ }
+ }
+ }
+ return builder.build();
+ }
}
\ No newline at end of file
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Fri Apr 13 20:28:21 2012
@@ -0,0 +1,782 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Action;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Exec;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.CompareType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.DeleteType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
+/**
+ * Helper utility to build protocol buffer requests,
+ * or build components for protocol buffer requests.
+ */
+@InterfaceAudience.Private
+public final class RequestConverter {
+
+ private RequestConverter() {
+ }
+
+// Start utilities for Client
+
+/**
+ * Create a new protocol buffer GetRequest to get a row, all columns in a family.
+ * If there is no such row, return the closest row before it.
+ *
+ * @param regionName the name of the region to get
+ * @param row the row to get
+ * @param family the column family to get
+ * @param closestRowBefore if the requested row doesn't exist,
+ * should return the immediate row before
+ * @return a protocol buffer GetReuqest
+ */
+ public static GetRequest buildGetRequest(final byte[] regionName,
+ final byte[] row, final byte[] family, boolean closestRowBefore) {
+ GetRequest.Builder builder = GetRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setClosestRowBefore(closestRowBefore);
+ builder.setRegion(region);
+
+ Column.Builder columnBuilder = Column.newBuilder();
+ columnBuilder.setFamily(ByteString.copyFrom(family));
+ ClientProtos.Get.Builder getBuilder =
+ ClientProtos.Get.newBuilder();
+ getBuilder.setRow(ByteString.copyFrom(row));
+ getBuilder.addColumn(columnBuilder.build());
+ builder.setGet(getBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer GetRequest for a client Get
+ *
+ * @param regionName the name of the region to get
+ * @param get the client Get
+ * @return a protocol buffer GetReuqest
+ */
+ public static GetRequest buildGetRequest(final byte[] regionName,
+ final Get get) throws IOException {
+ return buildGetRequest(regionName, get, false);
+ }
+
+ /**
+ * Create a protocol buffer GetRequest for a client Get
+ *
+ * @param regionName the name of the region to get
+ * @param get the client Get
+ * @param existenceOnly indicate if check row existence only
+ * @return a protocol buffer GetReuqest
+ */
+ public static GetRequest buildGetRequest(final byte[] regionName,
+ final Get get, final boolean existenceOnly) throws IOException {
+ GetRequest.Builder builder = GetRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setExistenceOnly(existenceOnly);
+ builder.setRegion(region);
+ builder.setGet(buildGet(get));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a client increment
+ *
+ * @param regionName
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param amount
+ * @param writeToWAL
+ * @return a mutate request
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final byte[] row, final byte[] family,
+ final byte [] qualifier, final long amount, final boolean writeToWAL) {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+
+ Mutate.Builder mutateBuilder = Mutate.newBuilder();
+ mutateBuilder.setRow(ByteString.copyFrom(row));
+ mutateBuilder.setMutateType(MutateType.INCREMENT);
+ mutateBuilder.setWriteToWAL(writeToWAL);
+ ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
+ columnBuilder.setFamily(ByteString.copyFrom(family));
+ QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
+ valueBuilder.setValue(ByteString.copyFrom(Bytes.toBytes(amount)));
+ valueBuilder.setQualifier(ByteString.copyFrom(qualifier));
+ columnBuilder.addQualifierValue(valueBuilder.build());
+ mutateBuilder.addColumnValue(columnBuilder.build());
+
+ builder.setMutate(mutateBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a conditioned put
+ *
+ * @param regionName
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param comparator
+ * @param compareType
+ * @param put
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final byte[] row, final byte[] family,
+ final byte [] qualifier, final WritableByteArrayComparable comparator,
+ final CompareType compareType, final Put put) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ Condition condition = buildCondition(
+ row, family, qualifier, comparator, compareType);
+ builder.setMutate(buildMutate(MutateType.PUT, put));
+ builder.setCondition(condition);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a conditioned delete
+ *
+ * @param regionName
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param comparator
+ * @param compareType
+ * @param delete
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final byte[] row, final byte[] family,
+ final byte [] qualifier, final WritableByteArrayComparable comparator,
+ final CompareType compareType, final Delete delete) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ Condition condition = buildCondition(
+ row, family, qualifier, comparator, compareType);
+ builder.setMutate(buildMutate(MutateType.DELETE, delete));
+ builder.setCondition(condition);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a put
+ *
+ * @param regionName
+ * @param put
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final Put put) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMutate(buildMutate(MutateType.PUT, put));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for an append
+ *
+ * @param regionName
+ * @param append
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final Append append) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMutate(buildMutate(MutateType.APPEND, append));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a client increment
+ *
+ * @param regionName
+ * @param increment
+ * @return a mutate request
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final Increment increment) {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMutate(buildMutate(increment));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutateRequest for a delete
+ *
+ * @param regionName
+ * @param delete
+ * @return a mutate request
+ * @throws IOException
+ */
+ public static MutateRequest buildMutateRequest(
+ final byte[] regionName, final Delete delete) throws IOException {
+ MutateRequest.Builder builder = MutateRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setMutate(buildMutate(MutateType.DELETE, delete));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MultiRequest for a row mutations
+ *
+ * @param regionName
+ * @param rowMutations
+ * @return a multi request
+ * @throws IOException
+ */
+ public static MultiRequest buildMultiRequest(final byte[] regionName,
+ final RowMutations rowMutations) throws IOException {
+ MultiRequest.Builder builder = MultiRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setAtomic(true);
+ for (Mutation mutation: rowMutations.getMutations()) {
+ MutateType mutateType = null;
+ if (mutation instanceof Put) {
+ mutateType = MutateType.PUT;
+ } else if (mutation instanceof Delete) {
+ mutateType = MutateType.DELETE;
+ } else {
+ throw new DoNotRetryIOException(
+ "RowMutations supports only put and delete, not "
+ + mutation.getClass().getName());
+ }
+ Mutate mutate = buildMutate(mutateType, mutation);
+ builder.addAction(ProtobufUtil.toParameter(mutate));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer ScanRequest for a client Scan
+ *
+ * @param regionName
+ * @param scan
+ * @param numberOfRows
+ * @param closeScanner
+ * @return a scan request
+ * @throws IOException
+ */
+ public static ScanRequest buildScanRequest(final byte[] regionName,
+ final Scan scan, final int numberOfRows,
+ final boolean closeScanner) throws IOException {
+ ScanRequest.Builder builder = ScanRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setNumberOfRows(numberOfRows);
+ builder.setCloseScanner(closeScanner);
+ builder.setRegion(region);
+
+ ClientProtos.Scan.Builder scanBuilder =
+ ClientProtos.Scan.newBuilder();
+ scanBuilder.setCacheBlocks(scan.getCacheBlocks());
+ scanBuilder.setBatchSize(scan.getBatch());
+ scanBuilder.setMaxVersions(scan.getMaxVersions());
+ TimeRange timeRange = scan.getTimeRange();
+ if (!timeRange.isAllTime()) {
+ HBaseProtos.TimeRange.Builder timeRangeBuilder =
+ HBaseProtos.TimeRange.newBuilder();
+ timeRangeBuilder.setFrom(timeRange.getMin());
+ timeRangeBuilder.setTo(timeRange.getMax());
+ scanBuilder.setTimeRange(timeRangeBuilder.build());
+ }
+ Map<String, byte[]> attributes = scan.getAttributesMap();
+ if (!attributes.isEmpty()) {
+ NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
+ for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
+ attributeBuilder.setName(attribute.getKey());
+ attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
+ scanBuilder.addAttribute(attributeBuilder.build());
+ }
+ }
+ byte[] startRow = scan.getStartRow();
+ if (startRow != null && startRow.length > 0) {
+ scanBuilder.setStartRow(ByteString.copyFrom(startRow));
+ }
+ byte[] stopRow = scan.getStopRow();
+ if (stopRow != null && stopRow.length > 0) {
+ scanBuilder.setStopRow(ByteString.copyFrom(stopRow));
+ }
+ if (scan.hasFilter()) {
+ scanBuilder.setFilter(ProtobufUtil.toParameter(scan.getFilter()));
+ }
+ Column.Builder columnBuilder = Column.newBuilder();
+ for (Map.Entry<byte[],NavigableSet<byte []>>
+ family: scan.getFamilyMap().entrySet()) {
+ columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
+ NavigableSet<byte []> columns = family.getValue();
+ columnBuilder.clearQualifier();
+ if (columns != null && columns.size() > 0) {
+ for (byte [] qualifier: family.getValue()) {
+ if (qualifier != null) {
+ columnBuilder.addQualifier(ByteString.copyFrom(qualifier));
+ }
+ }
+ }
+ scanBuilder.addColumn(columnBuilder.build());
+ }
+ builder.setScan(scanBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer ScanRequest for a scanner id
+ *
+ * @param scannerId
+ * @param numberOfRows
+ * @param closeScanner
+ * @return a scan request
+ */
+ public static ScanRequest buildScanRequest(final long scannerId,
+ final int numberOfRows, final boolean closeScanner) {
+ ScanRequest.Builder builder = ScanRequest.newBuilder();
+ builder.setNumberOfRows(numberOfRows);
+ builder.setCloseScanner(closeScanner);
+ builder.setScannerId(scannerId);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer LockRowRequest
+ *
+ * @param regionName
+ * @param row
+ * @return a lock row request
+ */
+ public static LockRowRequest buildLockRowRequest(
+ final byte[] regionName, final byte[] row) {
+ LockRowRequest.Builder builder = LockRowRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.addRow(ByteString.copyFrom(row));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer UnlockRowRequest
+ *
+ * @param regionName
+ * @param lockId
+ * @return a unlock row request
+ */
+ public static UnlockRowRequest buildUnlockRowRequest(
+ final byte[] regionName, final long lockId) {
+ UnlockRowRequest.Builder builder = UnlockRowRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setLockId(lockId);
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer bulk load request
+ *
+ * @param familyPaths
+ * @param regionName
+ * @return a bulk load request
+ */
+ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
+ final List<Pair<byte[], String>> familyPaths, final byte[] regionName) {
+ BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ FamilyPath.Builder familyPathBuilder = FamilyPath.newBuilder();
+ for (Pair<byte[], String> familyPath: familyPaths) {
+ familyPathBuilder.setFamily(ByteString.copyFrom(familyPath.getFirst()));
+ familyPathBuilder.setPath(familyPath.getSecond());
+ builder.addFamilyPath(familyPathBuilder.build());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer coprocessor exec request
+ *
+ * @param regionName
+ * @param exec
+ * @return a coprocessor exec request
+ * @throws IOException
+ */
+ public static ExecCoprocessorRequest buildExecCoprocessorRequest(
+ final byte[] regionName, final Exec exec) throws IOException {
+ ExecCoprocessorRequest.Builder builder = ExecCoprocessorRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ builder.setCall(buildExec(exec));
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer multi request for a list of actions.
+ * RowMutations in the list (if any) will be ignored.
+ *
+ * @param regionName
+ * @param actions
+ * @return a multi request
+ * @throws IOException
+ */
+ public static <R> MultiRequest buildMultiRequest(final byte[] regionName,
+ final List<Action<R>> actions) throws IOException {
+ MultiRequest.Builder builder = MultiRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(
+ RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ for (Action<R> action: actions) {
+ Message protoAction = null;
+ Row row = action.getAction();
+ if (row instanceof Get) {
+ protoAction = buildGet((Get)row);
+ } else if (row instanceof Put) {
+ protoAction = buildMutate(MutateType.PUT, (Put)row);
+ } else if (row instanceof Delete) {
+ protoAction = buildMutate(MutateType.DELETE, (Delete)row);
+ } else if (row instanceof Exec) {
+ protoAction = buildExec((Exec)row);
+ } else if (row instanceof Append) {
+ protoAction = buildMutate(MutateType.APPEND, (Append)row);
+ } else if (row instanceof Increment) {
+ protoAction = buildMutate((Increment)row);
+ } else if (row instanceof RowMutations) {
+ continue; // ignore RowMutations
+ } else {
+ throw new DoNotRetryIOException(
+ "multi doesn't support " + row.getClass().getName());
+ }
+ builder.addAction(ProtobufUtil.toParameter(protoAction));
+ }
+ return builder.build();
+ }
+
+// End utilities for Client
+
+ /**
+ * Create a protocol buffer Condition
+ *
+ * @param row
+ * @param family
+ * @param qualifier
+ * @param comparator
+ * @param compareType
+ * @return a Condition
+ * @throws IOException
+ */
+ private static Condition buildCondition(final byte[] row,
+ final byte[] family, final byte [] qualifier,
+ final WritableByteArrayComparable comparator,
+ final CompareType compareType) throws IOException {
+ Condition.Builder builder = Condition.newBuilder();
+ builder.setRow(ByteString.copyFrom(row));
+ builder.setFamily(ByteString.copyFrom(family));
+ builder.setQualifier(ByteString.copyFrom(qualifier));
+ builder.setComparator(ProtobufUtil.toParameter(comparator));
+ builder.setCompareType(compareType);
+ return builder.build();
+ }
+
+ /**
+ * Create a new protocol buffer Exec based on a client Exec
+ *
+ * @param exec
+ * @return
+ * @throws IOException
+ */
+ private static ClientProtos.Exec buildExec(
+ final Exec exec) throws IOException {
+ ClientProtos.Exec.Builder
+ builder = ClientProtos.Exec.newBuilder();
+ Configuration conf = exec.getConf();
+ if (conf != null) {
+ NameStringPair.Builder propertyBuilder = NameStringPair.newBuilder();
+ Iterator<Entry<String, String>> iterator = conf.iterator();
+ while (iterator.hasNext()) {
+ Entry<String, String> entry = iterator.next();
+ propertyBuilder.setName(entry.getKey());
+ propertyBuilder.setValue(entry.getValue());
+ builder.addProperty(propertyBuilder.build());
+ }
+ }
+ builder.setProtocolName(exec.getProtocolName());
+ builder.setMethodName(exec.getMethodName());
+ builder.setRow(ByteString.copyFrom(exec.getRow()));
+ Object[] parameters = exec.getParameters();
+ if (parameters != null && parameters.length > 0) {
+ Class<?>[] declaredClasses = exec.getParameterClasses();
+ for (int i = 0, n = parameters.length; i < n; i++) {
+ builder.addParameter(
+ ProtobufUtil.toParameter(declaredClasses[i], parameters[i]));
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer Get based on a client Get.
+ *
+ * @param get the client Get
+ * @return a protocol buffer Get
+ * @throws IOException
+ */
+ private static ClientProtos.Get buildGet(
+ final Get get) throws IOException {
+ ClientProtos.Get.Builder builder =
+ ClientProtos.Get.newBuilder();
+ builder.setRow(ByteString.copyFrom(get.getRow()));
+ builder.setCacheBlocks(get.getCacheBlocks());
+ builder.setMaxVersions(get.getMaxVersions());
+ if (get.getLockId() >= 0) {
+ builder.setLockId(get.getLockId());
+ }
+ if (get.getFilter() != null) {
+ builder.setFilter(ProtobufUtil.toParameter(get.getFilter()));
+ }
+ TimeRange timeRange = get.getTimeRange();
+ if (!timeRange.isAllTime()) {
+ HBaseProtos.TimeRange.Builder timeRangeBuilder =
+ HBaseProtos.TimeRange.newBuilder();
+ timeRangeBuilder.setFrom(timeRange.getMin());
+ timeRangeBuilder.setTo(timeRange.getMax());
+ builder.setTimeRange(timeRangeBuilder.build());
+ }
+ Map<String, byte[]> attributes = get.getAttributesMap();
+ if (!attributes.isEmpty()) {
+ NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
+ for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
+ attributeBuilder.setName(attribute.getKey());
+ attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
+ builder.addAttribute(attributeBuilder.build());
+ }
+ }
+ if (get.hasFamilies()) {
+ Column.Builder columnBuilder = Column.newBuilder();
+ Map<byte[], NavigableSet<byte[]>> families = get.getFamilyMap();
+ for (Map.Entry<byte[], NavigableSet<byte[]>> family: families.entrySet()) {
+ NavigableSet<byte[]> qualifiers = family.getValue();
+ columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
+ columnBuilder.clearQualifier();
+ if (qualifiers != null && qualifiers.size() > 0) {
+ for (byte[] qualifier: qualifiers) {
+ if (qualifier != null) {
+ columnBuilder.addQualifier(ByteString.copyFrom(qualifier));
+ }
+ }
+ }
+ builder.addColumn(columnBuilder.build());
+ }
+ }
+ return builder.build();
+ }
+
+ private static Mutate buildMutate(final Increment increment) {
+ Mutate.Builder builder = Mutate.newBuilder();
+ builder.setRow(ByteString.copyFrom(increment.getRow()));
+ builder.setMutateType(MutateType.INCREMENT);
+ builder.setWriteToWAL(increment.getWriteToWAL());
+ if (increment.getLockId() >= 0) {
+ builder.setLockId(increment.getLockId());
+ }
+ TimeRange timeRange = increment.getTimeRange();
+ if (!timeRange.isAllTime()) {
+ HBaseProtos.TimeRange.Builder timeRangeBuilder =
+ HBaseProtos.TimeRange.newBuilder();
+ timeRangeBuilder.setFrom(timeRange.getMin());
+ timeRangeBuilder.setTo(timeRange.getMax());
+ builder.setTimeRange(timeRangeBuilder.build());
+ }
+ ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
+ QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
+ for (Map.Entry<byte[],NavigableMap<byte[], Long>>
+ family: increment.getFamilyMap().entrySet()) {
+ columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
+ columnBuilder.clearQualifierValue();
+ NavigableMap<byte[], Long> values = family.getValue();
+ if (values != null && values.size() > 0) {
+ for (Map.Entry<byte[], Long> value: values.entrySet()) {
+ valueBuilder.setQualifier(ByteString.copyFrom(value.getKey()));
+ valueBuilder.setValue(ByteString.copyFrom(
+ Bytes.toBytes(value.getValue().longValue())));
+ columnBuilder.addQualifierValue(valueBuilder.build());
+ }
+ }
+ builder.addColumnValue(columnBuilder.build());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer Mutate based on a client Mutation
+ *
+ * @param mutateType
+ * @param mutation
+ * @return a mutate
+ * @throws IOException
+ */
+ private static Mutate buildMutate(final MutateType mutateType,
+ final Mutation mutation) throws IOException {
+ Mutate.Builder mutateBuilder = Mutate.newBuilder();
+ mutateBuilder.setRow(ByteString.copyFrom(mutation.getRow()));
+ mutateBuilder.setMutateType(mutateType);
+ mutateBuilder.setWriteToWAL(mutation.getWriteToWAL());
+ if (mutation.getLockId() >= 0) {
+ mutateBuilder.setLockId(mutation.getLockId());
+ }
+ mutateBuilder.setTimestamp(mutation.getTimeStamp());
+ Map<String, byte[]> attributes = mutation.getAttributesMap();
+ if (!attributes.isEmpty()) {
+ NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
+ for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
+ attributeBuilder.setName(attribute.getKey());
+ attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
+ mutateBuilder.addAttribute(attributeBuilder.build());
+ }
+ }
+ ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
+ QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
+ for (Map.Entry<byte[],List<KeyValue>>
+ family: mutation.getFamilyMap().entrySet()) {
+ columnBuilder.setFamily(ByteString.copyFrom(family.getKey()));
+ columnBuilder.clearQualifierValue();
+ for (KeyValue value: family.getValue()) {
+ valueBuilder.setQualifier(ByteString.copyFrom(value.getQualifier()));
+ valueBuilder.setValue(ByteString.copyFrom(value.getValue()));
+ valueBuilder.setTimestamp(value.getTimestamp());
+ if (mutateType == MutateType.DELETE) {
+ KeyValue.Type keyValueType = KeyValue.Type.codeToType(value.getType());
+ valueBuilder.setDeleteType(toDeleteType(keyValueType));
+ }
+ columnBuilder.addQualifierValue(valueBuilder.build());
+ }
+ mutateBuilder.addColumnValue(columnBuilder.build());
+ }
+ return mutateBuilder.build();
+ }
+
+ /**
+ * Convert a byte array to a protocol buffer RegionSpecifier
+ *
+ * @param type the region specifier type
+ * @param value the region specifier byte array value
+ * @return a protocol buffer RegionSpecifier
+ */
+ private static RegionSpecifier buildRegionSpecifier(
+ final RegionSpecifierType type, final byte[] value) {
+ RegionSpecifier.Builder regionBuilder = RegionSpecifier.newBuilder();
+ regionBuilder.setValue(ByteString.copyFrom(value));
+ regionBuilder.setType(type);
+ return regionBuilder.build();
+ }
+
+ /**
+ * Convert a delete KeyValue type to protocol buffer DeleteType.
+ *
+ * @param type
+ * @return
+ * @throws IOException
+ */
+ private static DeleteType toDeleteType(
+ KeyValue.Type type) throws IOException {
+ switch (type) {
+ case Delete:
+ return DeleteType.DELETE_ONE_VERSION;
+ case DeleteColumn:
+ return DeleteType.DELETE_MULTIPLE_VERSIONS;
+ case DeleteFamily:
+ return DeleteType.DELETE_FAMILY;
+ default:
+ throw new IOException("Unknown delete type: " + type);
+ }
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Fri Apr 13 20:28:21 2012
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Helper utility to build protocol buffer responses,
+ * or retrieve data from protocol buffer responses.
+ */
+@InterfaceAudience.Private
+public final class ResponseConverter {
+
+ private ResponseConverter() {
+ }
+
+// Start utilities for Client
+
+ /**
+ * Get the client Results from a protocol buffer ScanResponse
+ *
+ * @param response the protocol buffer ScanResponse
+ * @return the client Results in the response
+ */
+ public static Result[] getResults(final ScanResponse response) {
+ if (response == null) return null;
+ int count = response.getResultCount();
+ Result[] results = new Result[count];
+ for (int i = 0; i < count; i++) {
+ results[i] = ProtobufUtil.toResult(response.getResult(i));
+ }
+ return results;
+ }
+
+ /**
+ * Get the results from a protocol buffer MultiResponse
+ *
+ * @param proto the protocol buffer MultiResponse to convert
+ * @return the results in the MultiResponse
+ * @throws IOException
+ */
+ public static List<Object> getResults(
+ final ClientProtos.MultiResponse proto) throws IOException {
+ List<Object> results = new ArrayList<Object>();
+ List<ActionResult> resultList = proto.getResultList();
+ for (int i = 0, n = resultList.size(); i < n; i++) {
+ ActionResult result = resultList.get(i);
+ if (result.hasException()) {
+ results.add(ProtobufUtil.toException(result.getException()));
+ } else if (result.hasValue()) {
+ Object value = ProtobufUtil.toObject(result.getValue());
+ if (value instanceof ClientProtos.Result) {
+ results.add(ProtobufUtil.toResult((ClientProtos.Result)value));
+ } else {
+ results.add(value);
+ }
+ } else {
+ results.add(new Result());
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Wrap a throwable to an action result.
+ *
+ * @param t
+ * @return an action result
+ */
+ public static ActionResult buildActionResult(final Throwable t) {
+ ActionResult.Builder builder = ActionResult.newBuilder();
+ NameBytesPair.Builder parameterBuilder = NameBytesPair.newBuilder();
+ parameterBuilder.setName(t.getClass().getName());
+ parameterBuilder.setValue(
+ ByteString.copyFromUtf8(StringUtils.stringifyException(t)));
+ builder.setException(parameterBuilder.build());
+ return builder.build();
+ }
+
+// End utilities for Client
+// Start utilities for Admin
+
+ /**
+ * Get the list of regions to flush from a RollLogWriterResponse
+ *
+ * @param proto the RollLogWriterResponse
+ * @return the the list of regions to flush
+ */
+ public static byte[][] getRegions(final RollWALWriterResponse proto) {
+ if (proto == null || proto.getRegionToFlushCount() == 0) return null;
+ List<byte[]> regions = new ArrayList<byte[]>();
+ for (ByteString region: proto.getRegionToFlushList()) {
+ regions.add(region.toByteArray());
+ }
+ return (byte[][])regions.toArray();
+ }
+
+ /**
+ * Get the list of region info from a GetOnlineRegionResponse
+ *
+ * @param proto the GetOnlineRegionResponse
+ * @return the list of region info
+ */
+ public static List<HRegionInfo> getRegionInfos
+ (final GetOnlineRegionResponse proto) {
+ if (proto == null || proto.getRegionInfoCount() == 0) return null;
+ List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
+ for (RegionInfo regionInfo: proto.getRegionInfoList()) {
+ regionInfos.add(ProtobufUtil.toRegionInfo(regionInfo));
+ }
+ return regionInfos;
+ }
+
+ /**
+ * Get the region info from a GetRegionInfoResponse
+ *
+ * @param proto the GetRegionInfoResponse
+ * @return the region info
+ */
+ public static HRegionInfo getRegionInfo
+ (final GetRegionInfoResponse proto) {
+ if (proto == null || proto.getRegionInfo() == null) return null;
+ return ProtobufUtil.toRegionInfo(proto.getRegionInfo());
+ }
+
+ /**
+ * Get the region opening state from a OpenRegionResponse
+ *
+ * @param proto the OpenRegionResponse
+ * @return the region opening state
+ */
+ public static RegionOpeningState getRegionOpeningState
+ (final OpenRegionResponse proto) {
+ if (proto == null || proto.getOpeningStateCount() != 1) return null;
+ return RegionOpeningState.valueOf(
+ proto.getOpeningState(0).name());
+ }
+
+ /**
+ * Check if the region is closed from a CloseRegionResponse
+ *
+ * @param proto the CloseRegionResponse
+ * @return the region close state
+ */
+ public static boolean isClosed
+ (final CloseRegionResponse proto) {
+ if (proto == null || !proto.hasClosed()) return false;
+ return proto.getClosed();
+ }
+
+// End utilities for Admin
+}